hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1635536 [9/28] - in /hive/branches/spark: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoo...
Date Thu, 30 Oct 2014 16:22:48 GMT
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Thu Oct 30 16:22:33 2014
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -41,10 +44,6 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-
 /**
  * Record processor for fast merging of files.
  */
@@ -93,17 +92,17 @@ public class MergeFileRecordProcessor ex
       MapWork mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
       if (mapWork == null) {
         mapWork = Utilities.getMapWork(jconf);
-        if (mapWork instanceof MergeFileWork) {
-          mfWork = (MergeFileWork) mapWork;
-        } else {
-          throw new RuntimeException("MapWork should be an instance of" +
-              " MergeFileWork.");
-        }
         cache.cache(MAP_PLAN_KEY, mapWork);
       } else {
         Utilities.setMapWork(jconf, mapWork);
       }
 
+      if (mapWork instanceof MergeFileWork) {
+        mfWork = (MergeFileWork) mapWork;
+      } else {
+        throw new RuntimeException("MapWork should be an instance of MergeFileWork.");
+      }
+
       String alias = mfWork.getAliasToWork().keySet().iterator().next();
       mergeOp = mfWork.getAliasToWork().get(alias);
       LOG.info(mergeOp.dump(0));
@@ -157,10 +156,7 @@ public class MergeFileRecordProcessor ex
       }
       mergeOp.close(abort);
 
-      if (isLogInfoEnabled) {
-        logCloseInfo();
-      }
-      ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter);
+      ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter, jconf);
       mergeOp.preorderMap(rps);
     } catch (Exception e) {
       if (!abort) {
@@ -191,9 +187,6 @@ public class MergeFileRecordProcessor ex
         row[0] = key;
         row[1] = value;
         mergeOp.processOp(row, 0);
-        if (isLogInfoEnabled) {
-          logProgress();
-        }
       }
     } catch (Throwable e) {
       abort = true;
@@ -211,6 +204,7 @@ public class MergeFileRecordProcessor ex
   private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
     // there should be only one MRInput
     MRInputLegacy theMRInput = null;
+    LOG.info("VDK: the inputs are: " + inputs);
     for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
       if (inp.getValue() instanceof MRInputLegacy) {
         if (theMRInput != null) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Thu Oct 30 16:22:33 2014
@@ -52,13 +52,10 @@ public abstract class RecordProcessor  {
 
 
   // used to log memory usage periodically
-  public static MemoryMXBean memoryMXBean;
   protected boolean isLogInfoEnabled = false;
   protected boolean isLogTraceEnabled = false;
   protected MRTaskReporter reporter;
 
-  private long numRows = 0;
-  private long nextUpdateCntr = 1;
   protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected String CLASS_NAME = RecordProcessor.class.getName();
 
@@ -79,11 +76,6 @@ public abstract class RecordProcessor  {
     this.outputs = outputs;
     this.processorContext = processorContext;
 
-    // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
     isLogInfoEnabled = l4j.isInfoEnabled();
     isLogTraceEnabled = l4j.isTraceEnabled();
 
@@ -110,37 +102,6 @@ public abstract class RecordProcessor  {
 
   abstract void close();
 
-  /**
-   * Log information to be logged at the end
-   */
-  protected void logCloseInfo() {
-    long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory);
-  }
-
-  /**
-   * Log number of records processed and memory used after processing many records
-   */
-  protected void logProgress() {
-    numRows++;
-    if (numRows == nextUpdateCntr) {
-      long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-      l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory);
-      nextUpdateCntr = getNextUpdateRecordCounter(numRows);
-    }
-  }
-
-  private long getNextUpdateRecordCounter(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
-  }
-
   protected void createOutputMap() {
     Preconditions.checkState(outMap == null, "Outputs should only be setup once");
     outMap = Maps.newHashMap();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Thu Oct 30 16:22:33 2014
@@ -101,7 +101,7 @@ public class ReduceRecordProcessor  exte
       sources[tag] = new ReduceRecordSource();
       sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
           reader, tag == position, (byte) tag,
-          redWork.getScratchColumnVectorTypes());
+          redWork.getAllScratchColumnVectorTypeMaps());
       ois[tag] = sources[tag].getObjectInspector();
     }
 
@@ -165,11 +165,7 @@ public class ReduceRecordProcessor  exte
     }
 
     // run the operator pipeline
-    while (sources[position].pushRecord()) {
-      if (isLogInfoEnabled) {
-        logProgress();
-      }
-    }
+    while (sources[position].pushRecord()) {}
   }
 
   /**
@@ -208,7 +204,7 @@ public class ReduceRecordProcessor  exte
           dummyOp.close(abort);
         }
       }
-      ReportStats rps = new ReportStats(reporter);
+      ReportStats rps = new ReportStats(reporter, jconf);
       reducer.preorderMap(rps);
 
     } catch (Exception e) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Thu Oct 30 16:22:33 2014
@@ -102,9 +102,19 @@ public class SplitGrouper {
 
     // compute the total size per bucket
     long totalSize = 0;
+    boolean earlyExit = false;
     for (int bucketId : bucketSplitMap.keySet()) {
       long size = 0;
       for (InputSplit s : bucketSplitMap.get(bucketId)) {
+        // the incoming split may not be a file split when we are re-grouping TezGroupedSplits in
+        // the case of SMB join. So in this case, we can do an early exit by not doing the
+        // calculation for bucketSizeMap. Each bucket will assume it can fill availableSlots * waves
+        // (preset to 0.5) for SMB join.
+        if (!(s instanceof FileSplit)) {
+          bucketTaskMap.put(bucketId, (int) (availableSlots * waves));
+          earlyExit = true;
+          continue;
+        }
         FileSplit fsplit = (FileSplit) s;
         size += fsplit.getLength();
         totalSize += fsplit.getLength();
@@ -112,6 +122,10 @@ public class SplitGrouper {
       bucketSizeMap.put(bucketId, size);
     }
 
+    if (earlyExit) {
+      return bucketTaskMap;
+    }
+
     // compute the number of tasks
     for (int bucketId : bucketSizeMap.keySet()) {
       int numEstimatedTasks = 0;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Thu Oct 30 16:22:33 2014
@@ -68,12 +68,7 @@ public class TezJobMonitor {
       @Override
       public void run() {
         for (DAGClient c: shutdownList) {
-          try {
-            System.err.println("Trying to shutdown DAG");
-            c.tryKillDAG();
-          } catch (Exception e) {
-            // ignore
-          }
+          TezJobMonitor.killRunningJobs();
         }
         try {
           for (TezSessionState s: TezSessionState.getOpenSessions()) {
@@ -212,6 +207,21 @@ public class TezJobMonitor {
     return rc;
   }
 
+  /**
+   * killRunningJobs tries to terminate execution of all
+   * currently running tez queries. No guarantees, best effort only.
+   */
+  public static void killRunningJobs() {
+    for (DAGClient c: shutdownList) {
+      try {
+        System.err.println("Trying to shutdown DAG");
+        c.tryKillDAG();
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+  }
+
   private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
     StringBuffer reportBuffer = new StringBuffer();
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Thu Oct 30 16:22:33 2014
@@ -19,12 +19,9 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,11 +30,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -152,10 +148,13 @@ public class TezProcessor extends Abstra
       // Start the actual Inputs. After MRInput initialization.
       for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
         if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-          LOG.info("Input: " + inputEntry.getKey() + " is not cached");
+          LOG.info("Starting input " + inputEntry.getKey());
           inputEntry.getValue().start();
+          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputEntry
+              .getValue())));
         } else {
-          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+          LOG.info("Input: " + inputEntry.getKey()
+              + " is already cached. Skipping start and wait for ready");
         }
       }
 
@@ -194,6 +193,7 @@ public class TezProcessor extends Abstra
    * Must be initialized before it is used.
    *
    */
+  @SuppressWarnings("rawtypes")
   static class TezKVOutputCollector implements OutputCollector {
     private KeyValueWriter writer;
     private final LogicalOutput output;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Thu Oct 30 16:22:33 2014
@@ -141,7 +141,9 @@ public class TezSessionPoolManager {
   private TezSessionState getNewSessionState(HiveConf conf,
       String queueName, boolean doOpen) throws Exception {
     TezSessionState retTezSessionState = createSession(TezSessionState.makeSessionId());
-    retTezSessionState.setQueueName(queueName);
+    if (queueName != null) {
+      conf.set("tez.queue.name", queueName);
+    }
     String what = "Created";
     if (doOpen) {
       retTezSessionState.open(conf);
@@ -221,29 +223,27 @@ public class TezSessionPoolManager {
       throw new HiveException(e);
     }
 
-    HiveConf existingConf = session.getConf();
-    if (existingConf == null) {
-      return false;
-    }
-
+    boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
     // either variables will never be null because a default value is returned in case of absence
-    if (existingConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) !=
-        conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+    if (doAsEnabled != conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
       return false;
     }
 
     if (!session.isDefault()) {
-      if (existingConf.get("tez.queue.name") == conf.get("tez.queue.name")) {
-        // both are null
-        return true;
-      }
-      if ((existingConf.get("tez.queue.name") == null)) {
-        // doesn't matter if the other conf is null or not. if it is null, above case catches it
-        return false;
+      String queueName = session.getQueueName();
+      LOG.info("Current queue name is " + queueName + " incoming queue name is "
+          + conf.get("tez.queue.name"));
+      if (queueName == null) {
+        if (conf.get("tez.queue.name") != null) {
+          // queue names are different
+          return false;
+        } else {
+          return true;
+        }
       }
 
-      if (!existingConf.get("tez.queue.name").equals(conf.get("tez.queue.name"))) {
-        // handles the case of incoming conf having a null for tez.queue.name
+      if (!queueName.equals(conf.get("tez.queue.name"))) {
+        // the String.equals method handles the case of conf not having the queue name as well.
         return false;
       }
     } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Thu Oct 30 16:22:33 2014
@@ -67,13 +67,14 @@ public class TezSessionState {
   private LocalResource appJarLr;
   private TezClient session;
   private String sessionId;
-  private DagUtils utils;
+  private final DagUtils utils;
   private String queueName;
   private boolean defaultQueue = false;
   private String user;
 
   private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
   private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
+  private boolean doAsEnabled;
 
   private static List<TezSessionState> openSessions
     = Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -130,6 +131,8 @@ public class TezSessionState {
   public void open(HiveConf conf, String[] additionalFiles)
     throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
     this.conf = conf;
+    this.queueName = conf.get("tez.queue.name");
+    this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
 
     UserGroupInformation ugi;
     ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
@@ -207,11 +210,6 @@ public class TezSessionState {
     } catch(InterruptedException ie) {
       //ignore
     }
-    // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
-    // id is used for tez to reuse the current session rather than start a new one.
-    conf.set("mapreduce.framework.name", "yarn-tez");
-    conf.set("mapreduce.tez.session.tokill-application-id",
-        session.getAppMasterApplicationId().toString());
 
     openSessions.add(this);
   }
@@ -397,4 +395,8 @@ public class TezSessionState {
   public String getUser() {
     return user;
   }
+
+  public boolean getDoAsEnabled() {
+    return doAsEnabled;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Thu Oct 30 16:22:33 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.Un
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.counters.CounterGroup;
@@ -272,6 +273,7 @@ public class TezTask extends Task<TezWor
 
     // the name of the dag is what is displayed in the AM/Job UI
     DAG dag = DAG.create(work.getName());
+    dag.setCredentials(conf.getCredentials());
 
     for (BaseWork w: ws) {
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java Thu Oct 30 16:22:33 2014
@@ -18,13 +18,23 @@
 package org.apache.hadoop.hive.ql.exec.tez.tools;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.PriorityQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
@@ -34,16 +44,36 @@ import org.apache.tez.runtime.library.ap
  * Uses a priority queue to pick the KeyValuesReader of the input that is next in
  * sort order.
  */
+@SuppressWarnings("deprecation")
 public class KeyValueInputMerger extends KeyValueReader {
 
   public static final Log l4j = LogFactory.getLog(KeyValueInputMerger.class);
   private PriorityQueue<KeyValueReader> pQueue = null;
   private KeyValueReader nextKVReader = null;
+  private ObjectInspector[] inputObjInspectors = null;
+  private Deserializer deserializer = null;
+  private List<StructField> structFields = null;
+  private List<ObjectInspector> fieldOIs = null;
+  private final Map<KeyValueReader, List<Object>> kvReaderStandardObjMap =
+      new HashMap<KeyValueReader, List<Object>>();
 
-  public KeyValueInputMerger(List<KeyValueReader> multiMRInputs) throws Exception {
+  public KeyValueInputMerger(List<KeyValueReader> multiMRInputs, Deserializer deserializer,
+      ObjectInspector[] inputObjInspectors, List<String> sortCols) throws Exception {
     //get KeyValuesReaders from the LogicalInput and add them to priority queue
     int initialCapacity = multiMRInputs.size();
     pQueue = new PriorityQueue<KeyValueReader>(initialCapacity, new KVReaderComparator());
+    this.inputObjInspectors = inputObjInspectors;
+    this.deserializer = deserializer;
+    fieldOIs = new ArrayList<ObjectInspector>();
+    structFields = new ArrayList<StructField>();
+    StructObjectInspector structOI = (StructObjectInspector) inputObjInspectors[0];
+    for (String field : sortCols) {
+      StructField sf = structOI.getStructFieldRef(field);
+      structFields.add(sf);
+      ObjectInspector stdOI =
+          ObjectInspectorUtils.getStandardObjectInspector(sf.getFieldObjectInspector());
+      fieldOIs.add(stdOI);
+    }
     l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size());
     for (KeyValueReader input : multiMRInputs) {
       addToQueue(input);
@@ -58,6 +88,7 @@ public class KeyValueInputMerger extends
    */
   private void addToQueue(KeyValueReader kvReader) throws IOException {
     if (kvReader.next()) {
+      kvReaderStandardObjMap.remove(kvReader);
       pQueue.add(kvReader);
     }
   }
@@ -93,12 +124,53 @@ public class KeyValueInputMerger extends
    */
   class KVReaderComparator implements Comparator<KeyValueReader> {
 
+    @SuppressWarnings({ "unchecked" })
     @Override
     public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) {
       try {
-        BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue();
-        BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue();
-        return key1.compareTo(key2);
+        ObjectInspector oi = inputObjInspectors[0];
+        List<Object> row1, row2;
+        try {
+          if (kvReaderStandardObjMap.containsKey(kvReadr1)) {
+            row1 = kvReaderStandardObjMap.get(kvReadr1);
+          } else {
+            // we need to copy to standard object otherwise deserializer overwrites the values
+            row1 =
+                (List<Object>) ObjectInspectorUtils.copyToStandardObject(
+                    deserializer.deserialize((Writable) kvReadr1.getCurrentValue()), oi,
+                    ObjectInspectorCopyOption.WRITABLE);
+            kvReaderStandardObjMap.put(kvReadr1, row1);
+          }
+
+          if (kvReaderStandardObjMap.containsKey(kvReadr2)) {
+            row2 = kvReaderStandardObjMap.get(kvReadr2);
+          } else {
+            row2 =
+                (List<Object>) ObjectInspectorUtils.copyToStandardObject(
+                    deserializer.deserialize((Writable) kvReadr2.getCurrentValue()), oi,
+                    ObjectInspectorCopyOption.WRITABLE);
+            kvReaderStandardObjMap.put(kvReadr2, row2);
+          }
+        } catch (SerDeException e) {
+          throw new IOException(e);
+        }
+
+        StructObjectInspector structOI = (StructObjectInspector) oi;
+        int compare = 0;
+        int index = 0;
+        for (StructField sf : structFields) {
+          int pos = structOI.getAllStructFieldRefs().indexOf(sf);
+          Object key1 = row1.get(pos);
+          Object key2 = row2.get(pos);
+          ObjectInspector stdOI = fieldOIs.get(index);
+          compare = ObjectInspectorUtils.compare(key1, stdOI, key2, stdOI);
+          index++;
+          if (compare != 0) {
+            return compare;
+          }
+        }
+
+        return compare;
       } catch (IOException e) {
         l4j.error("Caught exception while reading shuffle input", e);
         //die!

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java Thu Oct 30 16:22:33 2014
@@ -18,85 +18,118 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
- * Vectorized extract operator implementation.  Consumes rows and outputs a
- * vectorized batch of subobjects.
+ * Vectorized extract operator implementation.
  **/
-public class VectorExtractOperator extends ExtractOperator {
+public class VectorExtractOperator extends ExtractOperator implements VectorizationContextRegion {
   private static final long serialVersionUID = 1L;
 
-  private int keyColCount;
-  private int valueColCount;
-  
-  private transient VectorizedRowBatch outputBatch;
-  private transient int remainingColCount;
+  private List<TypeInfo> reduceTypeInfos;
+
+  // Create a new outgoing vectorization context because we will project just the values.
+  private VectorizationContext vOutContext;
+
+  private int[] projectedColumns;
 
+  private String removeValueDotPrefix(String columnName) {
+    return columnName.substring("VALUE.".length());
+  }
   public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
     this();
     this.conf = (ExtractDesc) conf;
+
+    List<String> reduceColumnNames = vContext.getProjectionColumnNames();
+    int reduceColCount = reduceColumnNames.size();
+
+    /*
+     * Create a new vectorization context as projection of just the values columns, but 
+     * keep same output column manager must be inherited to track the scratch the columns.
+     */
+    vOutContext = new VectorizationContext(vContext);
+
+    // Set a fileKey with vectorization context.
+    vOutContext.setFileKey(vContext.getFileKey() + "/_EXTRACT_");
+
+    // Remove "VALUE." prefix from value columns and create a new projection
+    vOutContext.resetProjectionColumns();
+    for (int i = 0; i < reduceColCount; i++) {
+      String columnName = reduceColumnNames.get(i);
+      if (columnName.startsWith("VALUE.")) {
+        vOutContext.addProjectionColumn(removeValueDotPrefix(columnName), i);
+      }
+    }
   }
 
   public VectorExtractOperator() {
     super();
   }
 
+  /*
+   * Called by the Vectorizer class to pass the types from reduce shuffle.
+   */
+  public void setReduceTypeInfos(List<TypeInfo> reduceTypeInfos) {
+    this.reduceTypeInfos = reduceTypeInfos;
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0];
-    List<? extends StructField> fields = structInputObjInspector.getAllStructFieldRefs();
-    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-    ArrayList<String> colNames = new ArrayList<String>();
-    for (int i = keyColCount; i < fields.size(); i++) {
-      StructField field = fields.get(i);
-      String fieldName = field.getFieldName();
-
-      // Remove "VALUE." prefix.
-      int dotIndex = fieldName.indexOf(".");
-      colNames.add(fieldName.substring(dotIndex + 1));
-      ois.add(field.getFieldObjectInspector());
+    // Create the projection of the values and the output object inspector
+    // for just the value without their "VALUE." prefix.
+    int projectionSize = vOutContext.getProjectedColumns().size();
+    projectedColumns = new int[projectionSize];
+    List<String> columnNames = new ArrayList<String>();
+    List<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+    for (int i = 0; i < projectionSize; i++) {
+      int projectedIndex = vOutContext.getProjectedColumns().get(i);
+      projectedColumns[i] = projectedIndex;
+      String colName = vOutContext.getProjectionColumnNames().get(i);
+      columnNames.add(colName);
+      TypeInfo typeInfo = reduceTypeInfos.get(projectedIndex);
+      ObjectInspector oi = TypeInfoUtils
+          .getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
+      ois.add(oi);
     }
-    outputObjInspector = ObjectInspectorFactory
-              .getStandardStructObjectInspector(colNames, ois);
-    remainingColCount = fields.size() - keyColCount;
-    outputBatch =  new VectorizedRowBatch(remainingColCount);
+    outputObjInspector = ObjectInspectorFactory.
+            getStandardStructObjectInspector(columnNames, ois);
     initializeChildren(hconf);
   }
 
-  public void setKeyAndValueColCounts(int keyColCount, int valueColCount) {
-      this.keyColCount = keyColCount;
-      this.valueColCount = valueColCount;
-  }
   
   @Override
   // Remove the key columns and forward the values (and scratch columns).
   public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch inputBatch = (VectorizedRowBatch) row;
+    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
 
-    // Copy references to the input columns array starting after the keys...
-    for (int i = 0; i < remainingColCount; i++) {
-      outputBatch.cols[i] = inputBatch.cols[keyColCount + i];
-    }
-    outputBatch.size = inputBatch.size;
+    int[] originalProjections = vrg.projectedColumns;
+    int originalProjectionSize = vrg.projectionSize;
+
+    // Temporarily substitute our projection.
+    vrg.projectionSize = projectedColumns.length;
+    vrg.projectedColumns = projectedColumns;
 
-    forward(outputBatch, outputObjInspector);
+    forward(vrg, null);
+
+    // Revert the projected columns back, because vrg will be re-used.
+    vrg.projectionSize = originalProjectionSize;
+    vrg.projectedColumns = originalProjections;
+  }
+
+  @Override
+  public VectorizationContext getOuputVectorizationContext() {
+    return vOutContext;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Thu Oct 30 16:22:33 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 /**
@@ -50,10 +51,22 @@ public class VectorFileSinkOperator exte
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
-    valueWriters = VectorExpressionWriterFactory.getExpressionWriters(
-        (StructObjectInspector) inputObjInspectors[0]);
+    // We need a input object inspector that is for the row we will extract out of the
+    // vectorized row batch, not for example, an original inspector for an ORC table, etc.
+    VectorExpressionWriterFactory.processVectorInspector(
+            (StructObjectInspector) inputObjInspectors[0],
+            new VectorExpressionWriterFactory.SingleOIDClosure() {
+              @Override
+              public void assign(VectorExpressionWriter[] writers,
+                  ObjectInspector objectInspector) {
+                valueWriters = writers;
+                inputObjInspectors[0] = objectInspector;
+              }
+            });
     singleRow = new Object[valueWriters.length];
+
+    // Call FileSinkOperator with new input inspector.
+    super.initializeOp(hconf);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Thu Oct 30 16:22:33 2014
@@ -60,8 +60,6 @@ public class VectorFilterOperator extend
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
-      statsMap.put(Counter.FILTERED, filtered_count);
-      statsMap.put(Counter.PASSED, passed_count);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Thu Oct 30 16:22:33 2014
@@ -32,11 +32,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
-import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -46,12 +43,9 @@ import org.apache.hadoop.hive.ql.plan.Ag
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.DataOutputBuffer;
 
@@ -760,13 +754,7 @@ public class VectorGroupByOperator exten
     
     isVectorOutput = desc.getVectorDesc().isVectorOutput();
 
-    List<String> outColNames = desc.getOutputColumnNames();
-    Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size());
-    int outColIndex = 0;
-    for(String outCol: outColNames) {
-      mapOutCols.put(outCol,  outColIndex++);
-    }
-    vOutContext = new VectorizationContext(mapOutCols, outColIndex);
+    vOutContext = new VectorizationContext(desc.getOutputColumnNames());
     vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_");
     fileKey = vOutContext.getFileKey();
   }
@@ -811,7 +799,7 @@ public class VectorGroupByOperator exten
           vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector);
           outputBatch = vrbCtx.createVectorizedRowBatch();
           vectorColumnAssign = VectorColumnAssignFactory.buildAssigners(
-              outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames());
+              outputBatch, outputObjInspector, vOutContext.getProjectionColumnMap(), conf.getOutputColumnNames());
       }
 
     } catch (HiveException he) {
@@ -851,11 +839,19 @@ public class VectorGroupByOperator exten
   @Override
   public void startGroup() throws HiveException {
     processingMode.startGroup();
+
+    // We do not call startGroup on operators below because we are batching rows in
+    // an output batch and the semantics will not work.
+    // super.startGroup();
   }
 
   @Override
   public void endGroup() throws HiveException {
     processingMode.endGroup();
+
+    // We do not call endGroup on operators below because we are batching rows in
+    // an output batch and the semantics will not work.
+    // super.endGroup();
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java Thu Oct 30 16:22:33 2014
@@ -112,7 +112,10 @@ public class VectorGroupKeyHelper extend
       DecimalColumnVector inputColumnVector = (DecimalColumnVector) inputBatch.cols[keyIndex];
       DecimalColumnVector outputColumnVector = (DecimalColumnVector) outputBatch.cols[keyIndex];
       if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
-        outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0];
+
+        // Since we store references to Decimal128 instances, we must use the update method instead
+        // of plain assignment.
+        outputColumnVector.vector[outputBatch.size].update(inputColumnVector.vector[0]);
       } else {
         outputColumnVector.noNulls = false;
         outputColumnVector.isNull[outputBatch.size] = true;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java Thu Oct 30 16:22:33 2014
@@ -28,11 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -41,8 +37,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
@@ -116,16 +110,8 @@ public class VectorMapJoinOperator exten
     Map<Byte, List<ExprNodeDesc>> exprs = desc.getExprs();
     bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable));
 
-    List<String> outColNames = desc.getOutputColumnNames();
-    
-    Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size());
-    
-    int outColIndex = 0;
-    for(String outCol: outColNames) {
-      mapOutCols.put(outCol,  outColIndex++);
-    }
-    
-    vOutContext = new VectorizationContext(mapOutCols, outColIndex);
+    // We are making a new output vectorized row batch.
+    vOutContext = new VectorizationContext(desc.getOutputColumnNames());
     vOutContext.setFileKey(vContext.getFileKey() + "/MAP_JOIN_" + desc.getBigTableAlias());
     this.fileKey = vOutContext.getFileKey();
   }
@@ -207,7 +193,7 @@ public class VectorMapJoinOperator exten
     Object[] values = (Object[]) row;
     VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI);
     if (null == vcas) {
-      Map<String, Map<String, Integer>> allColumnMaps = Utilities.getScratchColumnMap(hconf);
+      Map<String, Map<String, Integer>> allColumnMaps = Utilities.getAllColumnVectorMaps(hconf);
       Map<String, Integer> columnMap = allColumnMaps.get(fileKey);
       vcas = VectorColumnAssignFactory.buildAssigners(
           outputBatch, outputOI, columnMap, conf.getOutputColumnNames());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Thu Oct 30 16:22:33 2014
@@ -40,7 +40,14 @@ public class VectorMapOperator extends M
     // The row has been converted to comply with table schema, irrespective of partition schema.
     // So, use tblOI (and not partOI) for forwarding
     try {
-      forward(value, current.getRowObjectInspector());
+      int childrenDone = 0;
+      for (MapOpCtx current : currentCtxs) {
+        if (!current.forward(value)) {
+          childrenDone++;
+        }
+      }
+
+      rowsForwarded(childrenDone, ((VectorizedRowBatch)value).size);
     } catch (Exception e) {
       throw new HiveException("Hive Runtime Error while processing row ", e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java Thu Oct 30 16:22:33 2014
@@ -27,16 +27,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
-import org.apache.hadoop.hive.ql.exec.JoinUtil;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -116,19 +113,9 @@ public class VectorSMBMapJoinOperator ex
 
     Map<Byte, List<ExprNodeDesc>> exprs = desc.getExprs();
     bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable));
-    
-    // Vectorized join operators need to create a new vectorization region for child operators.
-
-    List<String> outColNames = desc.getOutputColumnNames();
-    
-    Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size());
-    
-    int outColIndex = 0;
-    for(String outCol: outColNames) {
-      mapOutCols.put(outCol,  outColIndex++);
-    }
 
-    vOutContext = new VectorizationContext(mapOutCols, outColIndex);
+    // We are making a new output vectorized row batch.
+    vOutContext = new VectorizationContext(desc.getOutputColumnNames());
     vOutContext.setFileKey(vContext.getFileKey() + "/SMB_JOIN_" + desc.getBigTableAlias());
     this.fileKey = vOutContext.getFileKey();
   }
@@ -285,7 +272,7 @@ public class VectorSMBMapJoinOperator ex
     Object[] values = (Object[]) row;
     VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI);
     if (null == vcas) {
-      Map<String, Map<String, Integer>> allColumnMaps = Utilities.getScratchColumnMap(hconf);
+      Map<String, Map<String, Integer>> allColumnMaps = Utilities.getAllColumnVectorMaps(hconf);
       Map<String, Integer> columnMap = allColumnMaps.get(fileKey);
       vcas = VectorColumnAssignFactory.buildAssigners(
           outputBatch, outputOI, columnMap, conf.getOutputColumnNames());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Thu Oct 30 16:22:33 2014
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 
@@ -63,20 +62,20 @@ public class VectorSelectOperator extend
     }
 
     /**
-     * Create a new vectorization context to update the column map but same output column manager
-     * must be inherited to track the scratch the columns.
+     * Create a new vectorization context to create a new projection, but keep 
+     * same output column manager must be inherited to track the scratch the columns.
      */
     vOutContext = new VectorizationContext(vContext);
 
     // Set a fileKey, although this operator doesn't use it.
     vOutContext.setFileKey(vContext.getFileKey() + "/_SELECT_");
 
-    // Update column map
-    vOutContext.getColumnMap().clear();
+    vOutContext.resetProjectionColumns();
     for (int i=0; i < colList.size(); ++i) {
       String columnName = this.conf.getOutputColumnNames().get(i);
       VectorExpression ve = vExpressions[i];
-      vOutContext.addToColumnMap(columnName, ve.getOutputColumn());
+      vOutContext.addProjectionColumn(columnName, 
+              ve.getOutputColumn());
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Thu Oct 30 16:22:33 2014
@@ -23,11 +23,13 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -123,9 +125,98 @@ public class VectorizationContext {
 
   VectorExpressionDescriptor vMap;
 
+  private List<Integer> projectedColumns;
+  private List<String> projectionColumnNames;
+  private Map<String, Integer> projectionColumnMap;
+
   //columnName to column position map
-  private final Map<String, Integer> columnMap;
-  private final int firstOutputColumnIndex;
+  // private final Map<String, Integer> columnMap;
+  private int firstOutputColumnIndex;
+
+  // Convenient constructor for initial batch creation takes
+  // a list of columns names and maps them to 0..n-1 indices.
+  public VectorizationContext(List<String> initialColumnNames) {
+    this.projectionColumnNames = initialColumnNames;
+
+    projectedColumns = new ArrayList<Integer>();
+    projectionColumnMap = new HashMap<String, Integer>();
+    for (int i = 0; i < this.projectionColumnNames.size(); i++) {
+      projectedColumns.add(i);
+      projectionColumnMap.put(projectionColumnNames.get(i), i);
+    }
+    int firstOutputColumnIndex = projectedColumns.size();
+    this.ocm = new OutputColumnManager(firstOutputColumnIndex);
+    this.firstOutputColumnIndex = firstOutputColumnIndex;
+    vMap = new VectorExpressionDescriptor();
+  }
+
+  // Constructor to with the individual addInitialColumn method
+  // followed by a call to finishedAddingInitialColumns.
+  public VectorizationContext() {
+    projectedColumns = new ArrayList<Integer>();
+    projectionColumnNames = new ArrayList<String>();
+    projectionColumnMap = new HashMap<String, Integer>();
+    this.ocm = new OutputColumnManager(0);
+    this.firstOutputColumnIndex = 0;
+    vMap = new VectorExpressionDescriptor();
+  }
+
+  // Constructor useful making a projection vectorization context.
+  // Use with resetProjectionColumns and addProjectionColumn.
+  // Keeps existing output column map, etc.
+  public VectorizationContext(VectorizationContext vContext) {
+    this.projectedColumns = new ArrayList<Integer>();
+    this.projectionColumnNames = new ArrayList<String>();
+    this.projectionColumnMap = new HashMap<String, Integer>();
+
+    this.ocm = vContext.ocm;
+    this.firstOutputColumnIndex = vContext.firstOutputColumnIndex;
+    vMap = new VectorExpressionDescriptor();
+  }
+
+  // Add an initial column to a vectorization context when
+  // a vectorized row batch is being created.
+  public void addInitialColumn(String columnName) {
+    int index = projectedColumns.size();
+    projectedColumns.add(index);
+    projectionColumnNames.add(columnName);
+    projectionColumnMap.put(columnName, index);
+  }
+
+  // Finishes the vectorization context after all the initial
+  // columns have been added.
+  public void finishedAddingInitialColumns() {
+    int firstOutputColumnIndex = projectedColumns.size();
+    this.ocm = new OutputColumnManager(firstOutputColumnIndex);
+    this.firstOutputColumnIndex = firstOutputColumnIndex;
+  }
+
+  // Empties the projection columns.
+  public void resetProjectionColumns() {
+    projectedColumns = new ArrayList<Integer>();
+    projectionColumnNames = new ArrayList<String>();
+    projectionColumnMap = new HashMap<String, Integer>();
+  }
+
+  // Add a projection column to a projection vectorization context.
+  public void addProjectionColumn(String columnName, int vectorBatchColIndex) {
+    projectedColumns.add(vectorBatchColIndex);
+    projectionColumnNames.add(columnName);
+    projectionColumnMap.put(columnName, vectorBatchColIndex);
+  }
+
+  public List<Integer> getProjectedColumns() {
+    return projectedColumns;
+  }
+
+  public List<String> getProjectionColumnNames() {
+    return projectionColumnNames;
+  }
+
+  public Map<String, Integer> getProjectionColumnMap() {
+    return projectionColumnMap;
+  }
+
 
   public static final Pattern decimalTypePattern = Pattern.compile("decimal.*",
       Pattern.CASE_INSENSITIVE);
@@ -140,7 +231,7 @@ public class VectorizationContext {
       Pattern.CASE_INSENSITIVE);
 
   //Map column number to type
-  private final OutputColumnManager ocm;
+  private OutputColumnManager ocm;
 
   // File key is used by operators to retrieve the scratch vectors
   // from mapWork at runtime. The operators that modify the structure of
@@ -170,27 +261,6 @@ public class VectorizationContext {
     castExpressionUdfs.add(UDFToShort.class);
   }
 
-  public VectorizationContext(Map<String, Integer> columnMap,
-      int initialOutputCol) {
-    this.columnMap = columnMap;
-    this.ocm = new OutputColumnManager(initialOutputCol);
-    this.firstOutputColumnIndex = initialOutputCol;
-    vMap = new VectorExpressionDescriptor();
-  }
-
-  /**
-   * This constructor inherits the OutputColumnManger and from
-   * the 'parent' constructor, therefore this should be used only by operators
-   * that don't create a new vectorized row batch. This should be used only by
-   * operators that want to modify the columnName map without changing the row batch.
-   */
-  public VectorizationContext(VectorizationContext parent) {
-    this.columnMap = new HashMap<String, Integer>(parent.columnMap);
-    this.ocm = parent.ocm;
-    this.firstOutputColumnIndex = parent.firstOutputColumnIndex;
-    vMap = new VectorExpressionDescriptor();
-  }
-
   public String getFileKey() {
     return fileKey;
   }
@@ -199,16 +269,19 @@ public class VectorizationContext {
     this.fileKey = fileKey;
   }
 
-  protected int getInputColumnIndex(String name) {
-    if (!columnMap.containsKey(name)) {
-      LOG.error(String.format("The column %s is not in the vectorization context column map %s.", 
-                 name, columnMap.toString()));
+  protected int getInputColumnIndex(String name) throws HiveException {
+    if (name == null) {
+      throw new HiveException("Null column name");
+    }
+    if (!projectionColumnMap.containsKey(name)) {
+      throw new HiveException(String.format("The column %s is not in the vectorization context column map %s.", 
+                 name, projectionColumnMap.toString()));
     }
-    return columnMap.get(name);
+    return projectionColumnMap.get(name);
   }
 
   protected int getInputColumnIndex(ExprNodeColumnDesc colExpr) {
-    return columnMap.get(colExpr.getColumn());
+    return projectionColumnMap.get(colExpr.getColumn());
   }
 
   private static class OutputColumnManager {
@@ -280,7 +353,7 @@ public class VectorizationContext {
   }
 
   private VectorExpression getColumnVectorExpression(ExprNodeColumnDesc
-      exprDesc, Mode mode) {
+      exprDesc, Mode mode) throws HiveException {
     int columnNum = getInputColumnIndex(exprDesc.getColumn());
     VectorExpression expr = null;
     switch (mode) {
@@ -1988,7 +2061,7 @@ public class VectorizationContext {
         "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")");
   }
 
-  public Map<Integer, String> getOutputColumnTypeMap() {
+  public Map<Integer, String> getScratchColumnTypeMap() {
     Map<Integer, String> map = new HashMap<Integer, String>();
     for (int i = 0; i < ocm.outputColCount; i++) {
       String type = ocm.outputColumnsTypes[i];
@@ -1997,15 +2070,26 @@ public class VectorizationContext {
     return map;
   }
 
-  public Map<String, Integer> getColumnMap() {
-    return columnMap;
-  }
+  public String toString() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append("Context key ").append(getFileKey()).append(", ");
+
+    Comparator<Integer> comparerInteger = new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o1.compareTo(o2);
+        }};
 
-  public void addToColumnMap(String columnName, int outputColumn) throws HiveException {
-    if (columnMap.containsKey(columnName) && (columnMap.get(columnName) != outputColumn)) {
-      throw new HiveException(String.format("Column %s is already mapped to %d. Cannot remap to %d.",
-          columnName, columnMap.get(columnName), outputColumn));
+    Map<Integer, String> sortedColumnMap = new TreeMap<Integer, String>(comparerInteger);
+    for (Map.Entry<String, Integer> entry : projectionColumnMap.entrySet()) {
+      sortedColumnMap.put(entry.getValue(), entry.getKey());
     }
-    columnMap.put(columnName, outputColumn);
+    sb.append("sortedProjectionColumnMap ").append(sortedColumnMap).append(", ");
+
+    Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
+    sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap());
+    sb.append("sortedScratchColumnTypeMap ").append(sortedScratchColumnTypeMap);
+
+    return sb.toString();
   }
- }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Thu Oct 30 16:22:33 2014
@@ -23,6 +23,8 @@ import java.sql.Timestamp;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
@@ -50,6 +52,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 
 public class VectorizedBatchUtil {
+  private static final Log LOG = LogFactory.getLog(VectorizedBatchUtil.class);
 
   /**
    * Sets the IsNull value for ColumnVector at specified index
@@ -232,169 +235,237 @@ public class VectorizedBatchUtil {
     final int off = colOffset;
     // Iterate thru the cols and load the batch
     for (int i = 0; i < fieldRefs.size(); i++) {
-      Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
-      ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
-
-      // Vectorization only supports PRIMITIVE data types. Assert the same
-      assert (foi.getCategory() == Category.PRIMITIVE);
+      setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, off);
+    }
+  }
 
-      // Get writable object
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-      Object writableCol = poi.getPrimitiveWritableObject(fieldData);
-
-      // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
-      // float/double. String types have no default value for null.
-      switch (poi.getPrimitiveCategory()) {
-      case BOOLEAN: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((BooleanWritable) writableCol).get() ? 1 : 0;
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+  /**
+   * Iterates thru all the columns in a given row and populates the batch
+   * from a given offset
+   *
+   * @param row Deserialized row object
+   * @param oi Object insepector for that row
+   * @param rowIndex index to which the row should be added to batch
+   * @param batch Vectorized batch to which the row is added at rowIndex
+   * @param context context object for this vectorized batch
+   * @param buffer
+   * @throws HiveException
+   */
+  public static void acidAddRowToBatch(Object row,
+                                       StructObjectInspector oi,
+                                       int rowIndex,
+                                       VectorizedRowBatch batch,
+                                       VectorizedRowBatchCtx context,
+                                       DataOutputBuffer buffer) throws HiveException {
+    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+    // Iterate thru the cols and load the batch
+    for (int i = 0; i < fieldRefs.size(); i++) {
+      if (batch.cols[i] == null) {
+        // This means the column was not included in the projection from the underlying read
+        continue;
+      }
+      if (context.isPartitionCol(i)) {
+        // The value will have already been set before we're called, so don't overwrite it
+        continue;
       }
-        break;
-      case BYTE: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((ByteWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+      setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, 0);
+    }
+  }
+
+  private static void setVector(Object row,
+                                StructObjectInspector oi,
+                                List<? extends StructField> fieldRefs,
+                                VectorizedRowBatch batch,
+                                DataOutputBuffer buffer,
+                                int rowIndex,
+                                int colIndex,
+                                int offset) throws HiveException {
+
+    Object fieldData = oi.getStructFieldData(row, fieldRefs.get(colIndex));
+    ObjectInspector foi = fieldRefs.get(colIndex).getFieldObjectInspector();
+
+    // Vectorization only supports PRIMITIVE data types. Assert the same
+    assert (foi.getCategory() == Category.PRIMITIVE);
+
+    // Get writable object
+    PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+    Object writableCol = poi.getPrimitiveWritableObject(fieldData);
+
+    // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
+    // float/double. String types have no default value for null.
+    switch (poi.getPrimitiveCategory()) {
+    case BOOLEAN: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        lcv.vector[rowIndex] = ((BooleanWritable) writableCol).get() ? 1 : 0;
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case SHORT: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+    }
+      break;
+    case BYTE: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        lcv.vector[rowIndex] = ((ByteWritable) writableCol).get();
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case INT: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+    }
+      break;
+    case SHORT: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case LONG: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+    }
+      break;
+    case INT: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case DATE: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((DateWritable) writableCol).getDays();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+    }
+      break;
+    case LONG: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case FLOAT: {
-        DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
-          dcv.isNull[rowIndex] = false;
-        } else {
-          dcv.vector[rowIndex] = Double.NaN;
-          setNullColIsNullValue(dcv, rowIndex);
-        }
+    }
+      break;
+    case DATE: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        lcv.vector[rowIndex] = ((DateWritable) writableCol).getDays();
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case DOUBLE: {
-        DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
-          dcv.isNull[rowIndex] = false;
-        } else {
-          dcv.vector[rowIndex] = Double.NaN;
-          setNullColIsNullValue(dcv, rowIndex);
-        }
+    }
+      break;
+    case FLOAT: {
+      DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
+        dcv.isNull[rowIndex] = false;
+      } else {
+        dcv.vector[rowIndex] = Double.NaN;
+        setNullColIsNullValue(dcv, rowIndex);
       }
-        break;
-      case TIMESTAMP: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
-          lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t);
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          setNullColIsNullValue(lcv, rowIndex);
-        }
+    }
+      break;
+    case DOUBLE: {
+      DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
+        dcv.isNull[rowIndex] = false;
+      } else {
+        dcv.vector[rowIndex] = Double.NaN;
+        setNullColIsNullValue(dcv, rowIndex);
       }
-        break;
-      case BINARY: {
-        BytesColumnVector bcv = (BytesColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-            bcv.isNull[rowIndex] = false;
-            BytesWritable bw = (BytesWritable) writableCol;
-            byte[] bytes = bw.getBytes();
-            int start = buffer.getLength();
-            int length = bytes.length;
-            try {
-              buffer.write(bytes, 0, length);
-            } catch (IOException ioe) {
-              throw new IllegalStateException("bad write", ioe);
-            }
-            bcv.setRef(rowIndex, buffer.getData(), start, length);
-        } else {
-          setNullColIsNullValue(bcv, rowIndex);
-        }
+    }
+      break;
+    case TIMESTAMP: {
+      LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
+        lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t);
+        lcv.isNull[rowIndex] = false;
+      } else {
+        lcv.vector[rowIndex] = 1;
+        setNullColIsNullValue(lcv, rowIndex);
       }
-        break;
-      case STRING: {
-        BytesColumnVector bcv = (BytesColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
+    }
+      break;
+    case BINARY: {
+      BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
           bcv.isNull[rowIndex] = false;
-          Text colText = (Text) writableCol;
+          BytesWritable bw = (BytesWritable) writableCol;
+          byte[] bytes = bw.getBytes();
           int start = buffer.getLength();
-          int length = colText.getLength();
+          int length = bw.getLength();
           try {
-            buffer.write(colText.getBytes(), 0, length);
+            buffer.write(bytes, 0, length);
           } catch (IOException ioe) {
             throw new IllegalStateException("bad write", ioe);
           }
           bcv.setRef(rowIndex, buffer.getData(), start, length);
-        } else {
-          setNullColIsNullValue(bcv, rowIndex);
-        }
+      } else {
+        setNullColIsNullValue(bcv, rowIndex);
       }
-        break;
-      case CHAR: {
-        BytesColumnVector bcv = (BytesColumnVector) batch.cols[off + i];
+    }
+      break;
+    case STRING: {
+      BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        bcv.isNull[rowIndex] = false;
+        Text colText = (Text) writableCol;
+        int start = buffer.getLength();
+        int length = colText.getLength();
+        try {
+          buffer.write(colText.getBytes(), 0, length);
+        } catch (IOException ioe) {
+          throw new IllegalStateException("bad write", ioe);
+        }
+        bcv.setRef(rowIndex, buffer.getData(), start, length);
+      } else {
+        setNullColIsNullValue(bcv, rowIndex);
+      }
+    }
+      break;
+    case CHAR: {
+      BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        bcv.isNull[rowIndex] = false;
+        HiveChar colHiveChar = ((HiveCharWritable) writableCol).getHiveChar();
+        byte[] bytes = colHiveChar.getStrippedValue().getBytes();
+
+        // We assume the CHAR maximum length was enforced when the object was created.
+        int length = bytes.length;
+
+        int start = buffer.getLength();
+        try {
+          // In vector mode, we store CHAR as unpadded.
+          buffer.write(bytes, 0, length);
+        } catch (IOException ioe) {
+          throw new IllegalStateException("bad write", ioe);
+        }
+        bcv.setRef(rowIndex, buffer.getData(), start, length);
+      } else {
+        setNullColIsNullValue(bcv, rowIndex);
+      }
+    }
+      break;
+    case VARCHAR: {
+        BytesColumnVector bcv = (BytesColumnVector) batch.cols[offset + colIndex];
         if (writableCol != null) {
           bcv.isNull[rowIndex] = false;
-          HiveChar colHiveChar = ((HiveCharWritable) writableCol).getHiveChar();
-          byte[] bytes = colHiveChar.getStrippedValue().getBytes();
-          
-          // We assume the CHAR maximum length was enforced when the object was created.
+          HiveVarchar colHiveVarchar = ((HiveVarcharWritable) writableCol).getHiveVarchar();
+          byte[] bytes = colHiveVarchar.getValue().getBytes();
+
+          // We assume the VARCHAR maximum length was enforced when the object was created.
           int length = bytes.length;
 
           int start = buffer.getLength();
           try {
-            // In vector mode, we store CHAR as unpadded.
             buffer.write(bytes, 0, length);
           } catch (IOException ioe) {
             throw new IllegalStateException("bad write", ioe);
@@ -405,45 +476,21 @@ public class VectorizedBatchUtil {
         }
       }
         break;
-      case VARCHAR: {
-          BytesColumnVector bcv = (BytesColumnVector) batch.cols[off + i];
-          if (writableCol != null) {
-            bcv.isNull[rowIndex] = false;
-            HiveVarchar colHiveVarchar = ((HiveVarcharWritable) writableCol).getHiveVarchar();
-            byte[] bytes = colHiveVarchar.getValue().getBytes();
-
-            // We assume the VARCHAR maximum length was enforced when the object was created.
-            int length = bytes.length;
-
-            int start = buffer.getLength();
-            try {
-              buffer.write(bytes, 0, length);
-            } catch (IOException ioe) {
-              throw new IllegalStateException("bad write", ioe);
-            }
-            bcv.setRef(rowIndex, buffer.getData(), start, length);
-          } else {
-            setNullColIsNullValue(bcv, rowIndex);
-          }
-        }
-          break;
-      case DECIMAL:
-        DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[off + i];
-        if (writableCol != null) {
-          dcv.isNull[rowIndex] = false;
-          HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol;
-          dcv.vector[rowIndex].update(wobj.getHiveDecimal().unscaledValue(),
-              (short) wobj.getScale());
-        } else {
-          setNullColIsNullValue(dcv, rowIndex);
-        }
-        break;
-      default:
-        throw new HiveException("Vectorizaton is not supported for datatype:"
-            + poi.getPrimitiveCategory());
-      }
+    case DECIMAL:
+      DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[offset + colIndex];
+      if (writableCol != null) {
+        dcv.isNull[rowIndex] = false;
+        HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol;
+        dcv.vector[rowIndex].update(wobj.getHiveDecimal().unscaledValue(),
+            (short) wobj.getScale());
+      } else {
+        setNullColIsNullValue(dcv, rowIndex);
+      }
+      break;
+    default:
+      throw new HiveException("Vectorizaton is not supported for datatype:" +
+          poi.getPrimitiveCategory());
     }
   }
-
 }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Thu Oct 30 16:22:33 2014
@@ -22,10 +22,12 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -45,6 +47,7 @@ import org.apache.hadoop.hive.serde2.Col
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -83,7 +86,11 @@ public class VectorizedRowBatchCtx {
   private Map<String, Object> partitionValues;
   
   //partition types
-  private Map<String, PrimitiveCategory> partitionTypes;  
+  private Map<String, PrimitiveCategory> partitionTypes;
+
+  // partition column positions, for use by classes that need to know whether a given column is a
+  // partition column
+  private Set<Integer> partitionCols;
   
   // Column projection list - List of column indexes to include. This
   // list does not contain partition columns
@@ -132,7 +139,7 @@ public class VectorizedRowBatchCtx {
   public void init(Configuration hiveConf, String fileKey,
       StructObjectInspector rowOI) {
     Map<String, Map<Integer, String>> scratchColumnVectorTypes =
-            Utilities.getScratchColumnVectorTypes(hiveConf);
+            Utilities.getAllScratchColumnVectorTypeMaps(hiveConf);
     columnTypeMap = scratchColumnVectorTypes.get(fileKey);
     this.rowOI= rowOI;
     this.rawRowOI = rowOI;
@@ -183,7 +190,7 @@ public class VectorizedRowBatchCtx {
 
     String partitionPath = split.getPath().getParent().toString();
     columnTypeMap = Utilities
-        .getScratchColumnVectorTypes(hiveConf)
+        .getAllScratchColumnVectorTypeMaps(hiveConf)
         .get(partitionPath);
 
     Properties partProps =
@@ -202,12 +209,13 @@ public class VectorizedRowBatchCtx {
     // Check to see if this split is part of a partition of a table
     String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
 
+    String[] partKeys = null;
     if (pcols != null && pcols.length() > 0) {
 
       // Partitions exist for this table. Get the partition object inspector and
       // raw row object inspector (row with out partition col)
       LinkedHashMap<String, String> partSpec = part.getPartSpec();
-      String[] partKeys = pcols.trim().split("/");
+      partKeys = pcols.trim().split("/");
       String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);      
       String[] partKeyTypes = pcolTypes.trim().split(":");      
       
@@ -261,6 +269,15 @@ public class VectorizedRowBatchCtx {
               .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector}));
       rowOI = rowObjectInspector;
       rawRowOI = partRawRowObjectInspector;
+
+      // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it
+      partitionCols = new HashSet<Integer>();
+      if (pcols != null && pcols.length() > 0) {
+        for (int i = 0; i < partKeys.length; i++) {
+          partitionCols.add(getColIndexBasedOnColName(partKeys[i]));
+        }
+      }
+
     } else {
 
       // No partitions for this table, hence row OI equals raw row OI
@@ -487,7 +504,7 @@ public class VectorizedRowBatchCtx {
             lcv.isNull[0] = true;
             lcv.isRepeating = true;
           } else { 
-            lcv.fill(((Date) value).getTime());
+            lcv.fill(DateWritable.dateToDays((Date) value));
             lcv.isNull[0] = false;
           }          
         }
@@ -500,7 +517,7 @@ public class VectorizedRowBatchCtx {
             lcv.isNull[0] = true;
             lcv.isRepeating = true;
           } else { 
-            lcv.fill((long)(((Timestamp) value).getTime()));
+            lcv.fill(TimestampUtils.getTimeNanoSec((Timestamp) value));
             lcv.isNull[0] = false;
           }
         }
@@ -585,6 +602,16 @@ public class VectorizedRowBatchCtx {
     }
   }
 
+  /**
+   * Determine whether a given column is a partition column
+   * @param colnum column number in
+   * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context.
+   * @return true if it is a partition column, false otherwise
+   */
+  public final boolean isPartitionCol(int colnum) {
+    return (partitionCols == null) ? false : partitionCols.contains(colnum);
+  }
+
   private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException {
     if (columnTypeMap != null && !columnTypeMap.isEmpty()) {
       int origNumCols = vrb.numCols;



Mime
View raw message