hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1669775 [13/35] - in /hive/branches/spark: ./ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/...
Date Sat, 28 Mar 2015 14:03:49 GMT
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Sat Mar 28 14:03:43 2015
@@ -31,12 +31,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javolution.testing.AssertionException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
@@ -74,30 +75,47 @@ public class DynamicPartitionPruner {
 
   private static final Log LOG = LogFactory.getLog(DynamicPartitionPruner.class);
 
+  private final InputInitializerContext context;
+  private final MapWork work;
+  private final JobConf jobConf;
+
+
   private final Map<String, List<SourceInfo>> sourceInfoMap =
       new HashMap<String, List<SourceInfo>>();
 
   private final BytesWritable writable = new BytesWritable();
 
+  /* Keeps track of all events that need to be processed - irrespective of the source */
   private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
 
+  /* Keeps track of vertices from which events are expected */
   private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
 
+  // Stores negative values to count columns. Eventually set to #tasks X #columns after the source vertex completes.
+  private final Map<String, MutableInt> numExpectedEventsPerSource = new HashMap<>();
+  private final Map<String, MutableInt> numEventsSeenPerSource = new HashMap<>();
+
   private int sourceInfoCount = 0;
 
   private final Object endOfEvents = new Object();
 
   private int totalEventCount = 0;
 
-  public DynamicPartitionPruner() {
+  public DynamicPartitionPruner(InputInitializerContext context, MapWork work, JobConf jobConf) throws
+      SerDeException {
+    this.context = context;
+    this.work = work;
+    this.jobConf = jobConf;
+    synchronized (this) {
+      initialize();
+    }
   }
 
-  public void prune(MapWork work, JobConf jobConf, InputInitializerContext context)
+  public void prune()
       throws SerDeException, IOException,
       InterruptedException, HiveException {
 
     synchronized(sourcesWaitingForEvents) {
-      initialize(work, jobConf);
 
       if (sourcesWaitingForEvents.isEmpty()) {
         return;
@@ -112,11 +130,11 @@ public class DynamicPartitionPruner {
       }
     }
 
-    LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
+    LOG.info("Waiting for events (" + sourceInfoCount + " sources) ...");
     // synchronous event processing loop. Won't return until all events have
     // been processed.
     this.processEvents();
-    this.prunePartitions(work, context);
+    this.prunePartitions();
     LOG.info("Ok to proceed.");
   }
 
@@ -129,25 +147,38 @@ public class DynamicPartitionPruner {
     sourceInfoCount = 0;
   }
 
-  public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+  private void initialize() throws SerDeException {
     this.clear();
     Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+    // sources represent vertex names
     Set<String> sources = work.getEventSourceTableDescMap().keySet();
 
     sourcesWaitingForEvents.addAll(sources);
 
     for (String s : sources) {
+      // Set to 0 to start with. This will be decremented for all columns for which events
+      // are generated by this source - which is eventually used to determine number of expected
+      // events for the source. #colums X #tasks
+      numExpectedEventsPerSource.put(s, new MutableInt(0));
+      numEventsSeenPerSource.put(s, new MutableInt(0));
+      // Virtual relation generated by the reduce sync
       List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
+      // Real column name - on which the operation is being performed
       List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
+      // Expression for the operation. e.g. N^2 > 10
       List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
+      // eventSourceTableDesc, eventSourceColumnName, evenSourcePartKeyExpr move in lock-step.
+      // One entry is added to each at the same time
 
       Iterator<String> cit = columnNames.iterator();
       Iterator<ExprNodeDesc> pit = partKeyExprs.iterator();
+      // A single source can process multiple columns, and will send an event for each of them.
       for (TableDesc t : tables) {
+        numExpectedEventsPerSource.get(s).decrement();
         ++sourceInfoCount;
         String columnName = cit.next();
         ExprNodeDesc partKeyExpr = pit.next();
-        SourceInfo si = new SourceInfo(t, partKeyExpr, columnName, jobConf);
+        SourceInfo si = createSourceInfo(t, partKeyExpr, columnName, jobConf);
         if (!sourceInfoMap.containsKey(s)) {
           sourceInfoMap.put(s, new ArrayList<SourceInfo>());
         }
@@ -157,6 +188,8 @@ public class DynamicPartitionPruner {
         // We could have multiple sources restrict the same column, need to take
         // the union of the values in that case.
         if (columnMap.containsKey(columnName)) {
+          // All Sources are initialized up front. Events from different sources will end up getting added to the same list.
+          // Pruning is disabled if either source sends in an event which causes pruning to be skipped
           si.values = columnMap.get(columnName).values;
           si.skipPruning = columnMap.get(columnName).skipPruning;
         }
@@ -165,25 +198,27 @@ public class DynamicPartitionPruner {
     }
   }
 
-  private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException {
+  private void prunePartitions() throws HiveException {
     int expectedEvents = 0;
-    for (String source : this.sourceInfoMap.keySet()) {
-      for (SourceInfo si : this.sourceInfoMap.get(source)) {
+    for (Map.Entry<String, List<SourceInfo>> entry : this.sourceInfoMap.entrySet()) {
+      String source = entry.getKey();
+      for (SourceInfo si : entry.getValue()) {
         int taskNum = context.getVertexNumTasks(source);
-        LOG.info("Expecting " + taskNum + " events for vertex " + source);
+        LOG.info("Expecting " + taskNum + " events for vertex " + source + ", for column " + si.columnName);
         expectedEvents += taskNum;
-        prunePartitionSingleSource(source, si, work);
+        prunePartitionSingleSource(source, si);
       }
     }
 
     // sanity check. all tasks must submit events for us to succeed.
     if (expectedEvents != totalEventCount) {
       LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
-      throw new HiveException("Incorrect event count in dynamic parition pruning");
+      throw new HiveException("Incorrect event count in dynamic partition pruning");
     }
   }
 
-  private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work)
+  @VisibleForTesting
+  protected void prunePartitionSingleSource(String source, SourceInfo si)
       throws HiveException {
 
     if (si.skipPruning.get()) {
@@ -223,11 +258,11 @@ public class DynamicPartitionPruner {
     ExprNodeEvaluator eval = ExprNodeEvaluatorFactory.get(si.partKey);
     eval.initialize(soi);
 
-    applyFilterToPartitions(work, converter, eval, columnName, values);
+    applyFilterToPartitions(converter, eval, columnName, values);
   }
 
   @SuppressWarnings("rawtypes")
-  private void applyFilterToPartitions(MapWork work, Converter converter, ExprNodeEvaluator eval,
+  private void applyFilterToPartitions(Converter converter, ExprNodeEvaluator eval,
       String columnName, Set<Object> values) throws HiveException {
 
     Object[] row = new Object[1];
@@ -238,12 +273,12 @@ public class DynamicPartitionPruner {
       PartitionDesc desc = work.getPathToPartitionInfo().get(p);
       Map<String, String> spec = desc.getPartSpec();
       if (spec == null) {
-        throw new AssertionException("No partition spec found in dynamic pruning");
+        throw new IllegalStateException("No partition spec found in dynamic pruning");
       }
 
       String partValueString = spec.get(columnName);
       if (partValueString == null) {
-        throw new AssertionException("Could not find partition value for column: " + columnName);
+        throw new IllegalStateException("Could not find partition value for column: " + columnName);
       }
 
       Object partValue = converter.convert(partValueString);
@@ -267,17 +302,38 @@ public class DynamicPartitionPruner {
     }
   }
 
+  @VisibleForTesting
+  protected SourceInfo createSourceInfo(TableDesc t, ExprNodeDesc partKeyExpr, String columnName,
+                                        JobConf jobConf) throws
+      SerDeException {
+    return new SourceInfo(t, partKeyExpr, columnName, jobConf);
+
+  }
+
   @SuppressWarnings("deprecation")
-  private static class SourceInfo {
+  @VisibleForTesting
+  static class SourceInfo {
     public final ExprNodeDesc partKey;
     public final Deserializer deserializer;
     public final StructObjectInspector soi;
     public final StructField field;
     public final ObjectInspector fieldInspector;
+    /* List of partitions that are required - populated from processing each event */
     public Set<Object> values = new HashSet<Object>();
+    /* Whether to skipPruning - depends on the payload from an event which may signal skip - if the event payload is too large */
     public AtomicBoolean skipPruning = new AtomicBoolean();
     public final String columnName;
 
+    @VisibleForTesting // Only used for testing.
+    SourceInfo(TableDesc table, ExprNodeDesc partKey, String columnName, JobConf jobConf, Object forTesting) {
+      this.partKey = partKey;
+      this.columnName = columnName;
+      this.deserializer = null;
+      this.soi = null;
+      this.field = null;
+      this.fieldInspector = null;
+    }
+
     public SourceInfo(TableDesc table, ExprNodeDesc partKey, String columnName, JobConf jobConf)
         throws SerDeException {
 
@@ -328,52 +384,60 @@ public class DynamicPartitionPruner {
   }
 
   @SuppressWarnings("deprecation")
-  private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
+  @VisibleForTesting
+  protected String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
       IOException {
 
     DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
-    String columnName = in.readUTF();
-    boolean skip = in.readBoolean();
+    try {
+      String columnName = in.readUTF();
 
-    LOG.info("Source of event: " + sourceName);
+      LOG.info("Source of event: " + sourceName);
 
-    List<SourceInfo> infos = this.sourceInfoMap.get(sourceName);
-    if (infos == null) {
-      in.close();
-      throw new AssertionException("no source info for event source: " + sourceName);
-    }
-
-    SourceInfo info = null;
-    for (SourceInfo si : infos) {
-      if (columnName.equals(si.columnName)) {
-        info = si;
-        break;
+      List<SourceInfo> infos = this.sourceInfoMap.get(sourceName);
+      if (infos == null) {
+        throw new IllegalStateException("no source info for event source: " + sourceName);
       }
-    }
-
-    if (info == null) {
-      in.close();
-      throw new AssertionException("no source info for column: " + columnName);
-    }
 
-    if (skip) {
-      info.skipPruning.set(true);
-    }
-
-    while (payload.hasRemaining()) {
-      writable.readFields(in);
-
-      Object row = info.deserializer.deserialize(writable);
+      SourceInfo info = null;
+      for (SourceInfo si : infos) {
+        if (columnName.equals(si.columnName)) {
+          info = si;
+          break;
+        }
+      }
 
-      Object value = info.soi.getStructFieldData(row, info.field);
-      value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector);
+      if (info == null) {
+        throw new IllegalStateException("no source info for column: " + columnName);
+      }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding: " + value + " to list of required partitions");
+      if (info.skipPruning.get()) {
+        // Marked as skipped previously. Don't bother processing the rest of the payload.
+      } else {
+        boolean skip = in.readBoolean();
+        if (skip) {
+          info.skipPruning.set(true);
+        } else {
+          while (payload.hasRemaining()) {
+            writable.readFields(in);
+
+            Object row = info.deserializer.deserialize(writable);
+
+            Object value = info.soi.getStructFieldData(row, info.field);
+            value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector);
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: " + value + " to list of required partitions");
+            }
+            info.values.add(value);
+          }
+        }
+      }
+    } finally {
+      if (in != null) {
+        in.close();
       }
-      info.values.add(value);
     }
-    in.close();
     return sourceName;
   }
 
@@ -409,23 +473,47 @@ public class DynamicPartitionPruner {
     synchronized(sourcesWaitingForEvents) {
       if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
         ++totalEventCount;
+        numEventsSeenPerSource.get(event.getSourceVertexName()).increment();
         queue.offer(event);
+        checkForSourceCompletion(event.getSourceVertexName());
       }
     }
   }
 
   public void processVertex(String name) {
     LOG.info("Vertex succeeded: " + name);
-
     synchronized(sourcesWaitingForEvents) {
-      sourcesWaitingForEvents.remove(name);
+      // Get a deterministic count of number of tasks for the vertex.
+      MutableInt prevVal = numExpectedEventsPerSource.get(name);
+      int prevValInt = prevVal.intValue();
+      Preconditions.checkState(prevValInt < 0,
+          "Invalid value for numExpectedEvents for source: " + name + ", oldVal=" + prevValInt);
+      prevVal.setValue((-1) * prevValInt * context.getVertexNumTasks(name));
+      checkForSourceCompletion(name);
+    }
+  }
 
-      if (sourcesWaitingForEvents.isEmpty()) {
-        // we've got what we need; mark the queue
-        queue.offer(endOfEvents);
-      } else {
-        LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
+  private void checkForSourceCompletion(String name) {
+    int expectedEvents = numExpectedEventsPerSource.get(name).getValue();
+    if (expectedEvents < 0) {
+      // Expected events not updated yet - vertex SUCCESS notification not received.
+      return;
+    } else {
+      int processedEvents = numEventsSeenPerSource.get(name).getValue();
+      if (processedEvents == expectedEvents) {
+        sourcesWaitingForEvents.remove(name);
+        if (sourcesWaitingForEvents.isEmpty()) {
+          // we've got what we need; mark the queue
+          queue.offer(endOfEvents);
+        } else {
+          LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
+        }
+      } else if (processedEvents > expectedEvents) {
+        throw new IllegalStateException(
+            "Received too many events for " + name + ", Expected=" + expectedEvents +
+                ", Received=" + processedEvents);
       }
+      return;
     }
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Sat Mar 28 14:03:43 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -26,14 +27,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -54,30 +55,37 @@ public class HashTableLoader implements
 
   private static final Log LOG = LogFactory.getLog(HashTableLoader.class.getName());
 
-  private ExecMapperContext context;
   private Configuration hconf;
   private MapJoinDesc desc;
+  private TezContext tezContext;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
-    this.context = context;
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
     this.hconf = hconf;
     this.desc = joinOp.getConf();
   }
 
   @Override
-  public void load(
-      MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage)
+      throws HiveException {
 
-    TezContext tezContext = (TezContext) MapredContext.get();
     Map<Integer, String> parentToInput = desc.getParentToInput();
     Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
 
     boolean useOptimizedTables = HiveConf.getBoolVar(
         hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
+    boolean useHybridGraceHashJoin = HiveConf.getBoolVar(
+        hconf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN);
     boolean isFirstKey = true;
-    TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf);
+
+    // Disable hybrid grace hash join for n-way join
+    if (mapJoinTables.length > 2) {
+      useHybridGraceHashJoin = false;
+    }
+
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -87,6 +95,14 @@ public class HashTableLoader implements
       LogicalInput input = tezContext.getInput(inputName);
 
       try {
+        input.start();
+        tezContext.getTezProcessorContext().waitForAnyInputReady(
+            Collections.<Input> singletonList(input));
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+
+      try {
         KeyValueReader kvReader = (KeyValueReader) input.getReader();
         MapJoinObjectSerDeContext keyCtx = mapJoinTableSerdes[pos].getKeyContext(),
           valCtx = mapJoinTableSerdes[pos].getValueContext();
@@ -104,15 +120,17 @@ public class HashTableLoader implements
         isFirstKey = false;
         Long keyCountObj = parentKeyCounts.get(pos);
         long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue();
+
         MapJoinTableContainer tableContainer = useOptimizedTables
-            ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)
+            ? (useHybridGraceHashJoin ? new HybridHashTableContainer(hconf, keyCount, memUsage,
+                                                                     desc.getParentDataSizes().get(pos))
+                                      : new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage))
             : new HashMapWrapper(hconf, keyCount);
 
         while (kvReader.next()) {
           tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
               valCtx, (Writable)kvReader.getCurrentValue());
         }
-
         tableContainer.seal();
         mapJoinTables[pos] = tableContainer;
       } catch (IOException e) {
@@ -122,14 +140,6 @@ public class HashTableLoader implements
       } catch (Exception e) {
         throw new HiveException(e);
       }
-      // Register that the Input has been cached.
-      LOG.info("Is this a bucket map join: " + desc.isBucketMapJoin());
-      // cache is disabled for bucket map join because of the same reason
-      // given in loadHashTable in MapJoinOperator.
-      if (!desc.isBucketMapJoin()) {
-        tezCacheAccess.registerCachedInput(inputName);
-        LOG.info("Setting Input: " + inputName + " as cached");
-      }
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Sat Mar 28 14:03:43 2015
@@ -19,22 +19,17 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -57,7 +52,6 @@ import org.apache.tez.runtime.api.events
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
@@ -71,43 +65,44 @@ public class HiveSplitGenerator extends
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
 
-  private static final SplitGrouper grouper = new SplitGrouper();
-  private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
-  private InputInitializerContext context;
-  private static Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
-      new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
+  private final DynamicPartitionPruner pruner;
+  private final Configuration conf;
+  private final JobConf jobConf;
+  private final MRInputUserPayloadProto userPayloadProto;
+  private final SplitGrouper splitGrouper = new SplitGrouper();
 
-  public HiveSplitGenerator(InputInitializerContext initializerContext) {
+
+  public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
+      SerDeException {
     super(initializerContext);
-  }
+    Preconditions.checkNotNull(initializerContext);
+    userPayloadProto =
+        MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());
 
-  public HiveSplitGenerator() {
-    this(null);
-  }
+    this.conf =
+        TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
 
-  @Override
-  public List<Event> initialize() throws Exception {
-    InputInitializerContext rootInputContext = getContext();
+    this.jobConf = new JobConf(conf);
+    // Read all credentials into the credentials instance stored in JobConf.
+    ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
-    context = rootInputContext;
+    MapWork work = Utilities.getMapWork(jobConf);
 
-    MRInputUserPayloadProto userPayloadProto =
-        MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
+    // Events can start coming in the moment the InputInitializer is created. The pruner
+    // must be setup and initialized here so that it sets up it's structures to start accepting events.
+    // Setting it up in initialize leads to a window where events may come in before the pruner is
+    // initialized, which may cause it to drop events.
+    pruner = new DynamicPartitionPruner(initializerContext, work, jobConf);
 
-    Configuration conf =
-        TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+  }
 
+  @Override
+  public List<Event> initialize() throws Exception {
     boolean sendSerializedEvents =
         conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
 
-    // Read all credentials into the credentials instance stored in JobConf.
-    JobConf jobConf = new JobConf(conf);
-    ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
-
-    MapWork work = Utilities.getMapWork(jobConf);
-
     // perform dynamic partition pruning
-    pruner.prune(work, jobConf, context);
+    pruner.prune();
 
     InputSplitInfoMem inputSplitInfo = null;
     String realInputFormatName = conf.get("mapred.input.format.class");
@@ -118,8 +113,8 @@ public class HiveSplitGenerator extends
           (InputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(realInputFormatName),
               jobConf);
 
-      int totalResource = rootInputContext.getTotalAvailableResource().getMemory();
-      int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+      int totalResource = getContext().getTotalAvailableResource().getMemory();
+      int taskResource = getContext().getVertexTaskResource().getMemory();
       int availableSlots = totalResource / taskResource;
 
       // Create the un-grouped splits
@@ -132,12 +127,12 @@ public class HiveSplitGenerator extends
           + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
 
       Multimap<Integer, InputSplit> groupedSplits =
-          generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+          splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
       // And finally return them in a flat array
       InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
       LOG.info("Number of grouped splits: " + flatSplits.length);
 
-      List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
+      List<TaskLocationHint> locationHints = splitGrouper.createTaskLocationHints(flatSplits);
 
       Utilities.clearWork(jobConf);
 
@@ -158,87 +153,7 @@ public class HiveSplitGenerator extends
   }
 
 
-  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      Configuration conf, InputSplit[] splits, float waves, int availableSlots)
-      throws Exception {
-    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
-  }
-
-  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      Configuration conf, InputSplit[] splits, float waves, int availableSlots, String inputName,
-      boolean groupAcrossFiles) throws Exception {
-
-    MapWork work = populateMapWork(jobConf, inputName);
-    Multimap<Integer, InputSplit> bucketSplitMultiMap =
-        ArrayListMultimap.<Integer, InputSplit> create();
-
-    int i = 0;
-    InputSplit prevSplit = null;
-    for (InputSplit s : splits) {
-      // this is the bit where we make sure we don't group across partition
-      // schema boundaries
-      if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
-        ++i;
-        prevSplit = s;
-      }
-      bucketSplitMultiMap.put(i, s);
-    }
-    LOG.info("# Src groups for split generation: " + (i + 1));
-
-    // group them into the chunks we want
-    Multimap<Integer, InputSplit> groupedSplits =
-        grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
-
-    return groupedSplits;
-  }
-
-  private MapWork populateMapWork(JobConf jobConf, String inputName) {
-    MapWork work = null;
-    if (inputName != null) {
-      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
-      // work can still be null if there is no merge work for this input
-    }
-    if (work == null) {
-      work = Utilities.getMapWork(jobConf);
-    }
 
-    return work;
-  }
-
-  public boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles,
-      MapWork work) throws IOException {
-    boolean retval = false;
-    Path path = ((FileSplit) s).getPath();
-    PartitionDesc pd =
-        HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
-            path, cache);
-    String currentDeserializerClass = pd.getDeserializerClassName();
-    Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
-
-    Class<?> previousInputFormatClass = null;
-    String previousDeserializerClass = null;
-    if (prevSplit != null) {
-      Path prevPath = ((FileSplit) prevSplit).getPath();
-      if (!groupAcrossFiles) {
-        return !path.equals(prevPath);
-      }
-      PartitionDesc prevPD =
-          HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
-              prevPath, cache);
-      previousDeserializerClass = prevPD.getDeserializerClassName();
-      previousInputFormatClass = prevPD.getInputFileFormatClass();
-    }
-
-    if ((currentInputFormatClass != previousInputFormatClass)
-        || (!currentDeserializerClass.equals(previousDeserializerClass))) {
-      retval = true;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding split " + path + " to src new group? " + retval);
-    }
-    return retval;
-  }
 
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Sat Mar 28 14:03:43 2015
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 
@@ -55,6 +57,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -98,6 +101,7 @@ public class MapRecordProcessor extends
 
     // create map and fetch operators
     mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() {
+        @Override
         public Object call() {
           return Utilities.getMapWork(jconf);
         }
@@ -119,6 +123,7 @@ public class MapRecordProcessor extends
 	mergeWorkList.add(
           (MapWork) cache.retrieve(key,
               new Callable<Object>() {
+                @Override
                 public Object call() {
                   return Utilities.getMergeWork(jconf, prefix);
                 }
@@ -133,6 +138,10 @@ public class MapRecordProcessor extends
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
+    MapredContext.init(true, new JobConf(jconf));
+    ((TezContext) MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
+
     // Update JobConf using MRInput, info like filename comes via this
     legacyMRInput = getMRInput(inputs);
     if (legacyMRInput != null) {
@@ -160,6 +169,8 @@ public class MapRecordProcessor extends
         mapOp = new MapOperator();
       }
 
+      mapOp.setExecContext(execContext);
+
       connectOps.clear();
       if (mergeWorkList != null) {
         MapOperator mergeMapOp = null;
@@ -176,12 +187,13 @@ public class MapRecordProcessor extends
             mergeMapOp.setConf(mergeMapWork);
             l4j.info("Input name is " + mergeMapWork.getName());
             jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
+            mergeMapOp.initialize(jconf, null);
             mergeMapOp.setChildren(jconf);
 
             DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
             connectOps.put(mergeMapWork.getTag(), dummyOp);
 
-            mergeMapOp.setExecContext(new ExecMapperContext(jconf));
+            mergeMapOp.passExecContext(new ExecMapperContext(jconf));
             mergeMapOp.initializeLocalWork(jconf);
           }
         }
@@ -191,21 +203,19 @@ public class MapRecordProcessor extends
       mapOp.setConf(mapWork);
       l4j.info("Main input name is " + mapWork.getName());
       jconf.set(Utilities.INPUT_NAME, mapWork.getName());
+      mapOp.initialize(jconf, null);
       mapOp.setChildren(jconf);
+      mapOp.passExecContext(execContext);
       l4j.info(mapOp.dump(0));
 
-      MapredContext.init(true, new JobConf(jconf));
-      ((TezContext) MapredContext.get()).setInputs(inputs);
-      ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
-      mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
 
       initializeMapRecordSources();
-      mapOp.initialize(jconf, null);
+      mapOp.initializeMapOperator(jconf);
       if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
         for (MapOperator mergeMapOp : mergeMapOpList) {
           jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
-          mergeMapOp.initialize(jconf, null);
+          mergeMapOp.initializeMapOperator(jconf);
         }
       }
 
@@ -353,6 +363,17 @@ public class MapRecordProcessor extends
   private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
     // there should be only one MRInput
     MRInputLegacy theMRInput = null;
+
+    // start all mr/multi-mr inputs
+    Set<Input> li = new HashSet<Input>();
+    for (LogicalInput inp: inputs.values()) {
+      if (inp instanceof MRInputLegacy || inp instanceof MultiMRInput) {
+        inp.start();
+        li.add(inp);
+      }
+    }
+    processorContext.waitForAllInputsReady(li);
+
     l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
     for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
       if (inp.getValue() instanceof MRInputLegacy) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Sat Mar 28 14:03:43 2015
@@ -98,10 +98,11 @@ public class MergeFileRecordProcessor ex
       cacheKey = queryId + MAP_PLAN_KEY;
 
       MapWork mapWork = (MapWork) cache.retrieve(cacheKey, new Callable<Object>() {
-	  public Object call() {
-	    return Utilities.getMapWork(jconf);
-	  }
-	});
+        @Override
+        public Object call() {
+          return Utilities.getMapWork(jconf);
+        }
+      });
       Utilities.setMapWork(jconf, mapWork);
 
       if (mapWork instanceof MergeFileWork) {
@@ -116,7 +117,7 @@ public class MergeFileRecordProcessor ex
 
       MapredContext.init(true, new JobConf(jconf));
       ((TezContext) MapredContext.get()).setInputs(inputs);
-      mergeOp.setExecContext(execContext);
+      mergeOp.passExecContext(execContext);
       mergeOp.initializeLocalWork(jconf);
       mergeOp.initialize(jconf, null);
 
@@ -198,7 +199,7 @@ public class MergeFileRecordProcessor ex
       } else {
         row[0] = key;
         row[1] = value;
-        mergeOp.processOp(row, 0);
+        mergeOp.process(row, 0);
       }
     } catch (Throwable e) {
       abort = true;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Sat Mar 28 14:03:43 2015
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.tez.runtime.api.ObjectRegistry;
 
 import com.google.common.base.Preconditions;
 
@@ -41,6 +44,8 @@ public class ObjectCache implements org.
   // before anything else.
   private volatile static ObjectRegistry staticRegistry;
 
+  private static ExecutorService staticPool;
+
   private final ObjectRegistry registry;
 
   public ObjectCache() {
@@ -51,6 +56,7 @@ public class ObjectCache implements org.
 
   public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
     staticRegistry = objectRegistry;
+    staticPool = Executors.newCachedThreadPool();
   }
 
   @Override
@@ -59,21 +65,32 @@ public class ObjectCache implements org.
     LOG.info("Releasing key: " + key);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Object retrieve(String key, Callable<?> fn) throws HiveException {
-    Object o;
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+    T value;
     try {
-      o = registry.get(key);
-      if (o == null) {
-	o = fn.call();
-	LOG.info("Caching key: " + key);
-	registry.cacheForVertex(key, o);
+      value = (T) registry.get(key);
+      if (value == null) {
+        value = fn.call();
+        LOG.info("Caching key: " + key);
+        registry.cacheForVertex(key, value);
       } else {
-	LOG.info("Found " + key + " in cache with value: " + o);
+        LOG.info("Found " + key + " in cache with value: " + value);
       }
     } catch (Exception e) {
       throw new HiveException(e);
     }
-    return o;
+    return value;
+  }
+
+  @Override
+  public <T> Future<T> retrieveAsync(final String key, final Callable<T> fn) throws HiveException {
+    return staticPool.submit(new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        return retrieve(key, fn);
+      }
+    });
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Sat Mar 28 14:03:43 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +86,7 @@ public class ReduceRecordProcessor  exte
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     cacheKey = queryId + REDUCE_PLAN_KEY;
     redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
+        @Override
         public Object call() {
           return Utilities.getReduceWork(jconf);
         }
@@ -103,9 +105,14 @@ public class ReduceRecordProcessor  exte
     for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
       TableDesc keyTableDesc = redWork.getKeyDesc();
       TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
-      KeyValuesReader reader =
-          (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader();
 
+      // make the reader ready for prime time
+      Input input = inputs.get(redWork.getTagToInput().get(tag));
+      input.start();
+      processorContext.waitForAnyInputReady(Collections.singleton(input));
+      KeyValuesReader reader = (KeyValuesReader) input.getReader();
+
+      // now we can setup the record source
       sources[tag] = new ReduceRecordSource();
       sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
           reader, tag == position, (byte) tag,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Sat Mar 28 14:03:43 2015
@@ -325,7 +325,7 @@ public class ReduceRecordSource implemen
       row.add(deserializeValue(valueWritable, tag));
 
       try {
-        reducer.processOp(row, tag);
+        reducer.process(row, tag);
       } catch (Exception e) {
         String rowString = null;
         try {
@@ -364,7 +364,7 @@ public class ReduceRecordSource implemen
         rowIdx++;
         if (rowIdx >= BATCH_SIZE) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-          reducer.processOp(batch, tag);
+          reducer.process(batch, tag);
 
           // Reset just the value columns and value buffer.
           for (int i = keysColumnOffset; i < batch.numCols; i++) {
@@ -377,7 +377,7 @@ public class ReduceRecordSource implemen
       if (rowIdx > 0) {
         // Flush final partial batch.
         VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-        reducer.processOp(batch, tag);
+        reducer.process(batch, tag);
       }
       batch.reset();
       keyBuffer.reset();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Sat Mar 28 14:03:43 2015
@@ -26,13 +26,20 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -49,8 +56,15 @@ public class SplitGrouper {
 
   private static final Log LOG = LogFactory.getLog(SplitGrouper.class);
 
+  // TODO This needs to be looked at. Map of Map to Map... Made concurrent for now since split generation
+  // can happen in parallel.
+  private static final Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
+      new ConcurrentHashMap<>();
+
   private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper();
 
+
+
   /**
    * group splits for each bucket separately - while evenly filling all the
    * available slots with tasks
@@ -87,12 +101,83 @@ public class SplitGrouper {
     return bucketGroupedSplitMultimap;
   }
 
+
+  /**
+   * Create task location hints from a set of input splits
+   * @param splits the actual splits
+   * @return taskLocationHints - 1 per input split specified
+   * @throws IOException
+   */
+  public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits) throws IOException {
+
+    List<TaskLocationHint> locationHints = Lists.newArrayListWithCapacity(splits.length);
+
+    for (InputSplit split : splits) {
+      String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null;
+      if (rack == null) {
+        if (split.getLocations() != null) {
+          locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split
+              .getLocations())), null));
+        } else {
+          locationHints.add(TaskLocationHint.createTaskLocationHint(null, null));
+        }
+      } else {
+        locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack)));
+      }
+    }
+
+    return locationHints;
+  }
+
+  /** Generate groups of splits, separated by schema evolution boundaries */
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+                                                                    Configuration conf,
+                                                                    InputSplit[] splits,
+                                                                    float waves, int availableSlots)
+      throws Exception {
+    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
+  }
+
+  /** Generate groups of splits, separated by schema evolution boundaries */
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+                                                                    Configuration conf,
+                                                                    InputSplit[] splits,
+                                                                    float waves, int availableSlots,
+                                                                    String inputName,
+                                                                    boolean groupAcrossFiles) throws
+      Exception {
+
+    MapWork work = populateMapWork(jobConf, inputName);
+    Multimap<Integer, InputSplit> bucketSplitMultiMap =
+        ArrayListMultimap.<Integer, InputSplit> create();
+
+    int i = 0;
+    InputSplit prevSplit = null;
+    for (InputSplit s : splits) {
+      // this is the bit where we make sure we don't group across partition
+      // schema boundaries
+      if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
+        ++i;
+        prevSplit = s;
+      }
+      bucketSplitMultiMap.put(i, s);
+    }
+    LOG.info("# Src groups for split generation: " + (i + 1));
+
+    // group them into the chunks we want
+    Multimap<Integer, InputSplit> groupedSplits =
+        this.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
+
+    return groupedSplits;
+  }
+
+
   /**
    * get the size estimates for each bucket in tasks. This is used to make sure
    * we allocate the head room evenly
    */
   private Map<Integer, Integer> estimateBucketSizes(int availableSlots, float waves,
-      Map<Integer, Collection<InputSplit>> bucketSplitMap) {
+                                                    Map<Integer, Collection<InputSplit>> bucketSplitMap) {
 
     // mapping of bucket id to size of all splits in bucket in bytes
     Map<Integer, Long> bucketSizeMap = new HashMap<Integer, Long>();
@@ -147,24 +232,54 @@ public class SplitGrouper {
     return bucketTaskMap;
   }
 
-  public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits) throws IOException {
+  private static MapWork populateMapWork(JobConf jobConf, String inputName) {
+    MapWork work = null;
+    if (inputName != null) {
+      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+      // work can still be null if there is no merge work for this input
+    }
+    if (work == null) {
+      work = Utilities.getMapWork(jobConf);
+    }
 
-    List<TaskLocationHint> locationHints = Lists.newArrayListWithCapacity(splits.length);
+    return work;
+  }
 
-    for (InputSplit split : splits) {
-      String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null;
-      if (rack == null) {
-        if (split.getLocations() != null) {
-          locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split
-              .getLocations())), null));
-        } else {
-          locationHints.add(TaskLocationHint.createTaskLocationHint(null, null));
-        }
-      } else {
-        locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack)));
+  private boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles,
+                                       MapWork work) throws IOException {
+    boolean retval = false;
+    Path path = ((FileSplit) s).getPath();
+    PartitionDesc pd =
+        HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+            path, cache);
+    String currentDeserializerClass = pd.getDeserializerClassName();
+    Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
+
+    Class<?> previousInputFormatClass = null;
+    String previousDeserializerClass = null;
+    if (prevSplit != null) {
+      Path prevPath = ((FileSplit) prevSplit).getPath();
+      if (!groupAcrossFiles) {
+        return !path.equals(prevPath);
       }
+      PartitionDesc prevPD =
+          HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+              prevPath, cache);
+      previousDeserializerClass = prevPD.getDeserializerClassName();
+      previousInputFormatClass = prevPD.getInputFileFormatClass();
     }
 
-    return locationHints;
+    if ((currentInputFormatClass != previousInputFormatClass)
+        || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+      retval = true;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding split " + path + " to src new group? " + retval);
+    }
+    return retval;
   }
+
+
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Sat Mar 28 14:03:43 2015
@@ -785,7 +785,7 @@ public class TezJobMonitor {
       final int running = progress.getRunningTaskCount();
       final int failed = progress.getFailedTaskAttemptCount();
       if (total <= 0) {
-        reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
+        reportBuffer.append(String.format("%s: -/-\t", s));
       } else {
         if (complete == total && !completed.contains(s)) {
           completed.add(s);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Sat Mar 28 14:03:43 2015
@@ -19,9 +19,9 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,7 +33,6 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -143,20 +142,6 @@ public class TezProcessor extends Abstra
       throws Exception {
     Throwable originalThrowable = null;
     try {
-      // Outputs will be started later by the individual Processors.
-      TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
-      // Start the actual Inputs. After MRInput initialization.
-      for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
-        if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-          LOG.info("Starting input " + inputEntry.getKey());
-          inputEntry.getValue().start();
-          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputEntry
-              .getValue())));
-        } else {
-          LOG.info("Input: " + inputEntry.getKey()
-              + " is already cached. Skipping start and wait for ready");
-        }
-      }
 
       MRTaskReporter mrReporter = new MRTaskReporter(getContext());
       rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java Sat Mar 28 14:03:43 2015
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.v
 
 import java.sql.Timestamp;
 
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+
 public final class TimestampUtils {
 
   /**
@@ -58,4 +60,8 @@ public final class TimestampUtils {
   public static long doubleToNanoseconds(double d) {
     return (long) (d * 1000000000);
   }
+
+  public static long daysToNanoseconds(long daysSinceEpoch) {
+    return DateWritable.daysToMillis((int) daysSinceEpoch) * 1000000;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java Sat Mar 28 14:03:43 2015
@@ -18,7 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
@@ -27,15 +28,9 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -59,18 +54,19 @@ public class VectorAppMasterEventOperato
   }
 
   @Override
-  public void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     valueWriters = VectorExpressionWriterFactory.getExpressionWriters(
         (StructObjectInspector) inputObjInspectors[0]);
     singleRow = new Object[valueWriters.length];
+    return result;
   }
 
   @Override
-  public void processOp(Object data, int tag) throws HiveException {
-    
+  public void process(Object data, int tag) throws HiveException {
+
     VectorizedRowBatch vrg = (VectorizedRowBatch) data;
-    
+
     Writable [] records = null;
     Writable recordValue = null;
     boolean vectorizedSerde = false;
@@ -85,7 +81,7 @@ public class VectorAppMasterEventOperato
     } catch (SerDeException e1) {
       throw new HiveException(e1);
     }
-    
+
     for (int i = 0; i < vrg.size; i++) {
       Writable row = null;
       if (vectorizedSerde) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java Sat Mar 28 14:03:43 2015
@@ -32,6 +32,8 @@ import org.apache.hadoop.hive.serde2.io.
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
@@ -47,6 +49,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.DateUtils;
 
 /**
  * This class is used as a static factory for VectorColumnAssign.
@@ -338,6 +341,35 @@ public class VectorColumnAssignFactory {
           }
         }.init(outputBatch, (LongColumnVector) destCol);
         break;
+      case INTERVAL_YEAR_MONTH:
+        outVCA = new VectorLongColumnAssign() {
+          @Override
+          public void assignObjectValue(Object val, int destIndex) throws HiveException {
+            if (val == null) {
+              assignNull(destIndex);
+            }
+            else {
+              HiveIntervalYearMonthWritable bw = (HiveIntervalYearMonthWritable) val;
+              assignLong(bw.getHiveIntervalYearMonth().getTotalMonths(), destIndex);
+            }
+          }
+        }.init(outputBatch, (LongColumnVector) destCol);
+        break;
+      case INTERVAL_DAY_TIME:outVCA = new VectorLongColumnAssign() {
+        @Override
+        public void assignObjectValue(Object val, int destIndex) throws HiveException {
+          if (val == null) {
+            assignNull(destIndex);
+          }
+          else {
+            HiveIntervalDayTimeWritable bw = (HiveIntervalDayTimeWritable) val;
+            assignLong(
+                DateUtils.getIntervalDayTimeTotalNanos(bw.getHiveIntervalDayTime()),
+                destIndex);
+          }
+        }
+      }.init(outputBatch, (LongColumnVector) destCol);
+      break;
       default:
         throw new HiveException("Incompatible Long vector column and primitive category " +
             category);
@@ -535,6 +567,10 @@ public class VectorColumnAssignFactory {
         vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BINARY);
       } else if (writables[i] instanceof TimestampWritable) {
         vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.TIMESTAMP);
+      } else if (writables[i] instanceof HiveIntervalYearMonthWritable) {
+        vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_YEAR_MONTH);
+      } else if (writables[i] instanceof HiveIntervalDayTimeWritable) {
+        vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_DAY_TIME);
       } else if (writables[i] instanceof BooleanWritable) {
         vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BOOLEAN);
       } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java Sat Mar 28 14:03:43 2015
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hive.common.util.AnnotationUtils;
 
 /**
@@ -66,9 +67,13 @@ public class VectorExpressionDescriptor
     STRING_FAMILY           (STRING.value | CHAR.value | VARCHAR.value),
     DATE                    (0x040),
     TIMESTAMP               (0x080),
+    INTERVAL_YEAR_MONTH     (0x100),
+    INTERVAL_DAY_TIME       (0x200),
     DATETIME_FAMILY         (DATE.value | TIMESTAMP.value),
+    INTERVAL_FAMILY         (INTERVAL_YEAR_MONTH.value | INTERVAL_DAY_TIME.value),
     INT_TIMESTAMP_FAMILY    (INT_FAMILY.value | TIMESTAMP.value),
-    INT_DATETIME_FAMILY     (INT_FAMILY.value | DATETIME_FAMILY.value),
+    INT_INTERVAL_FAMILY     (INT_FAMILY.value | INTERVAL_FAMILY.value),
+    INT_DATETIME_INTERVAL_FAMILY  (INT_FAMILY.value | DATETIME_FAMILY.value | INTERVAL_FAMILY.value),
     STRING_DATETIME_FAMILY  (STRING_FAMILY.value | DATETIME_FAMILY.value),
     ALL_FAMILY              (0xFFF);
 
@@ -105,6 +110,10 @@ public class VectorExpressionDescriptor
         return TIMESTAMP;
       } else if (lower.equals("date")) {
         return DATE;
+      } else if (lower.equals(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME)) {
+        return INTERVAL_YEAR_MONTH;
+      } else if (lower.equals(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) {
+        return INTERVAL_DAY_TIME;
       } else if (lower.equals("void")) {
         // The old code let void through...
         return INT_FAMILY;
@@ -137,7 +146,9 @@ public class VectorExpressionDescriptor
     public static String getVectorColumnSimpleName(ArgumentType argType) {
       if (argType == INT_FAMILY ||
           argType == DATE ||
-          argType == TIMESTAMP) {
+          argType == TIMESTAMP ||
+          argType == INTERVAL_YEAR_MONTH ||
+          argType == INTERVAL_DAY_TIME) {
         return "Long";
       } else if (argType == FLOAT_FAMILY) {
         return "Double";

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Sat Mar 28 14:03:43 2015
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -50,7 +53,7 @@ public class VectorFileSinkOperator exte
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
     // We need a input object inspector that is for the row we will extract out of the
     // vectorized row batch, not for example, an original inspector for an ORC table, etc.
     VectorExpressionWriterFactory.processVectorInspector(
@@ -66,15 +69,15 @@ public class VectorFileSinkOperator exte
     singleRow = new Object[valueWriters.length];
 
     // Call FileSinkOperator with new input inspector.
-    super.initializeOp(hconf);
+    return super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object data, int tag) throws HiveException {
+  public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch vrg = (VectorizedRowBatch)data;
     for (int i = 0; i < vrg.size; i++) {
       Object[] row = getRowObject(vrg, i);
-      super.processOp(row, tag);
+      super.process(row, tag);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Sat Mar 28 14:03:43 2015
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -27,7 +30,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 
 /**
  * Filter operator implementation.
@@ -39,7 +41,7 @@ public class VectorFilterOperator extend
   private VectorExpression conditionEvaluator = null;
 
   // Temporary selected vector
-  private int[] temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
+  private final int[] temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
 
   // filterMode is 1 if condition is always true, -1 if always false
   // and 0 if condition needs to be computed.
@@ -59,7 +61,8 @@ public class VectorFilterOperator extend
 
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
@@ -74,7 +77,7 @@ public class VectorFilterOperator extend
         filterMode = -1;
       }
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   public void setFilterCondition(VectorExpression expr) {
@@ -82,7 +85,7 @@ public class VectorFilterOperator extend
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
 
     VectorizedRowBatch vrg = (VectorizedRowBatch) row;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Sat Mar 28 14:03:43 2015
@@ -22,18 +22,19 @@ import java.lang.management.ManagementFa
 import java.lang.management.MemoryMXBean;
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.Ag
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -54,7 +56,8 @@ import org.apache.hadoop.io.DataOutputBu
  * stores the aggregate operators' intermediate states. Emits row mode output.
  *
  */
-public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion {
+public class VectorGroupByOperator extends Operator<GroupByDesc> implements
+    VectorizationContextRegion {
 
   private static final Log LOG = LogFactory.getLog(
       VectorGroupByOperator.class.getName());
@@ -100,7 +103,15 @@ public class VectorGroupByOperator exten
   private transient VectorizedRowBatchCtx vrbCtx;
 
   private transient VectorColumnAssign[] vectorColumnAssign;
-  
+
+  private transient int numEntriesHashTable;
+
+  private transient long maxHashTblMemory;
+
+  private transient long maxMemory;
+
+  private float memoryThreshold;
+
   /**
    * Interface for processing mode: global, hash, unsorted streaming, or group batch
    */
@@ -118,9 +129,11 @@ public class VectorGroupByOperator exten
   private abstract class ProcessingModeBase implements IProcessingMode {
 
     // Overridden and used in sorted reduce group batch processing mode.
+    @Override
     public void startGroup() throws HiveException {
       // Do nothing.
     }
+    @Override
     public void endGroup() throws HiveException {
       // Do nothing.
     }
@@ -177,7 +190,7 @@ public class VectorGroupByOperator exten
   private class ProcessingModeGlobalAggregate extends ProcessingModeBase {
 
     /**
-     * In global processing mode there is only one set of aggregation buffers 
+     * In global processing mode there is only one set of aggregation buffers
      */
     private VectorAggregationBufferRow aggregationBuffers;
 
@@ -233,7 +246,7 @@ public class VectorGroupByOperator exten
     private long sumBatchSize;
 
     /**
-     * Max number of entries in the vector group by aggregation hashtables. 
+     * Max number of entries in the vector group by aggregation hashtables.
      * Exceeding this will trigger a flush irrelevant of memory pressure condition.
      */
     private int maxHtEntries = 1000000;
@@ -247,12 +260,12 @@ public class VectorGroupByOperator exten
      * Percent of entries to flush when memory threshold exceeded.
      */
     private float percentEntriesToFlush = 0.1f;
-  
+
     /**
      * A soft reference used to detect memory pressure
      */
     private SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
-    
+
     /**
      * Counts the number of time the gcCanary died and was resurrected
      */
@@ -289,7 +302,7 @@ public class VectorGroupByOperator exten
             HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
           this.numRowsCompareHashAggr = HiveConf.getIntVar(hconf,
             HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
-      } 
+      }
       else {
         this.percentEntriesToFlush =
             HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT.defaultFloatVal;
@@ -322,14 +335,14 @@ public class VectorGroupByOperator exten
       processAggregators(batch);
 
       //Flush if memory limits were reached
-      // We keep flushing until the memory is under threshold 
+      // We keep flushing until the memory is under threshold
       int preFlushEntriesCount = numEntriesHashTable;
       while (shouldFlush(batch)) {
         flush(false);
 
         if(gcCanary.get() == null) {
           gcCanaryFlushes++;
-          gcCanary = new SoftReference<Object>(new Object()); 
+          gcCanary = new SoftReference<Object>(new Object());
         }
 
         //Validate that some progress is being made
@@ -468,7 +481,7 @@ public class VectorGroupByOperator exten
         mapKeysAggregationBuffers.clear();
         numEntriesHashTable = 0;
       }
-      
+
       if (all && LOG.isDebugEnabled()) {
         LOG.debug(String.format("GC canary caused %d flushes", gcCanaryFlushes));
       }
@@ -495,7 +508,7 @@ public class VectorGroupByOperator exten
       if (gcCanary.get() == null) {
         return true;
       }
-      
+
       return false;
     }
 
@@ -515,14 +528,14 @@ public class VectorGroupByOperator exten
     }
 
     /**
-     * Checks if the HT reduces the number of entries by at least minReductionHashAggr factor 
+     * Checks if the HT reduces the number of entries by at least minReductionHashAggr factor
      * @throws HiveException
      */
     private void checkHashModeEfficiency() throws HiveException {
       if (lastModeCheckRowCount > numRowsCompareHashAggr) {
         lastModeCheckRowCount = 0;
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d", 
+          LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d",
               numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr)));
         }
         if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
@@ -541,7 +554,7 @@ public class VectorGroupByOperator exten
    */
   private class ProcessingModeUnsortedStreaming extends ProcessingModeBase {
 
-    /** 
+    /**
      * The aggregation buffers used in streaming mode
      */
     private VectorAggregationBufferRow currentStreamingAggregators;
@@ -554,19 +567,19 @@ public class VectorGroupByOperator exten
     /**
      * The keys that needs to be flushed at the end of the current batch
      */
-    private final VectorHashKeyWrapper[] keysToFlush = 
+    private final VectorHashKeyWrapper[] keysToFlush =
         new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
 
     /**
      * The aggregates that needs to be flushed at the end of the current batch
      */
-    private final VectorAggregationBufferRow[] rowsToFlush = 
+    private final VectorAggregationBufferRow[] rowsToFlush =
         new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
 
     /**
      * A pool of VectorAggregationBufferRow to avoid repeated allocations
      */
-    private VectorUtilBatchObjectPool<VectorAggregationBufferRow> 
+    private VectorUtilBatchObjectPool<VectorAggregationBufferRow>
       streamAggregationBufferRowPool;
 
     @Override
@@ -658,7 +671,7 @@ public class VectorGroupByOperator exten
    *      vectorized reduce-shuffle feeds the batches to us.
    *
    *   2) Later at endGroup after reduce-shuffle has fed us all the input batches for the group,
-   *      we fill in the aggregation columns in outputBatch at outputBatch.size.  Our method 
+   *      we fill in the aggregation columns in outputBatch at outputBatch.size.  Our method
    *      writeGroupRow does this and finally increments outputBatch.size.
    *
    */
@@ -672,7 +685,7 @@ public class VectorGroupByOperator exten
      */
     VectorGroupKeyHelper groupKeyHelper;
 
-    /** 
+    /**
      * The group vector aggregation buffers.
      */
     private VectorAggregationBufferRow groupAggregators;
@@ -750,7 +763,7 @@ public class VectorGroupByOperator exten
       AggregationDesc aggDesc = aggrDesc.get(i);
       aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce());
     }
-    
+
     isVectorOutput = desc.getVectorDesc().isVectorOutput();
 
     vOutContext = new VectorizationContext(desc.getOutputColumnNames());
@@ -762,7 +775,8 @@ public class VectorGroupByOperator exten
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -773,9 +787,9 @@ public class VectorGroupByOperator exten
 
       // grouping id should be pruned, which is the last of key columns
       // see ColumnPrunerGroupByProc
-      outputKeyLength = 
+      outputKeyLength =
           conf.pruneGroupingSetId() ? keyExpressions.length - 1 : keyExpressions.length;
-      
+
       keyOutputWriters = new VectorExpressionWriter[outputKeyLength];
 
       for(int i = 0; i < outputKeyLength; ++i) {
@@ -812,8 +826,6 @@ public class VectorGroupByOperator exten
       throw new HiveException(e);
     }
 
-    initializeChildren(hconf);
-
     forwardCache = new Object[outputKeyLength + aggregators.length];
 
     if (outputKeyLength == 0) {
@@ -826,13 +838,14 @@ public class VectorGroupByOperator exten
       processingMode = this.new ProcessingModeHashAggregate();
     }
     processingMode.initialize(hconf);
+    return result;
   }
 
   /**
    * changes the processing mode to unsorted streaming
-   * This is done at the request of the hash agg mode, if the number of keys 
+   * This is done at the request of the hash agg mode, if the number of keys
    * exceeds the minReductionHashAggr factor
-   * @throws HiveException 
+   * @throws HiveException
    */
   private void changeToUnsortedStreamingMode() throws HiveException {
     processingMode = this.new ProcessingModeUnsortedStreaming();
@@ -859,7 +872,7 @@ public class VectorGroupByOperator exten
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
     if (batch.size > 0) {
@@ -962,4 +975,9 @@ public class VectorGroupByOperator exten
   public VectorizationContext getOuputVectorizationContext() {
     return vOutContext;
   }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.GROUPBY;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java?rev=1669775&r1=1669774&r2=1669775&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java Sat Mar 28 14:03:43 2015
@@ -39,7 +39,7 @@ public class VectorLimitOperator extends
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
     if (currCount < limit) {



Mime
View raw message