hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1629562 [5/38] - in /hive/branches/spark: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ contrib/src...
Date Mon, 06 Oct 2014 03:44:26 GMT
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Oct  6 03:44:13 2014
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -33,13 +35,31 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
+import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -56,16 +76,39 @@ public class ReduceRecordProcessor  exte
   private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
 
   public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
+  private final ExecMapperContext execContext = new ExecMapperContext();
+  private boolean abort = false;
+  private Deserializer inputKeyDeserializer;
+
+  // Input value serde needs to be an array to support different SerDe
+  // for different tags
+  private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
 
-  private ReduceWork redWork;
+  TableDesc keyTableDesc;
+  TableDesc[] valueTableDesc;
 
+  ObjectInspector[] rowObjectInspector;
   private Operator<?> reducer;
+  private boolean isTagged = false;
+
+  private Object keyObject = null;
+  private BytesWritable groupKey;
+
+  private ReduceWork redWork;
 
-  private ReduceRecordSource[] sources;
+  private boolean vectorized = false;
 
-  private final byte position = 0;
+  List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
 
-  private boolean abort;
+  private DataOutputBuffer buffer;
+  private VectorizedRowBatch[] batches;
+  // number of columns pertaining to keys in a vectorized row batch
+  private int keysColumnOffset;
+  private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
+  private StructObjectInspector keyStructInspector;
+  private StructObjectInspector[] valueStructInspectors;
+  /* this is only used in the error code path */
+  private List<VectorExpressionWriter>[] valueStringWriters;
 
   @Override
   void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
@@ -75,6 +118,10 @@ public class ReduceRecordProcessor  exte
 
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
 
+    rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+    ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+    ObjectInspector keyObjectInspector;
+
     redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
     if (redWork == null) {
       redWork = Utilities.getReduceWork(jconf);
@@ -84,36 +131,95 @@ public class ReduceRecordProcessor  exte
     }
 
     reducer = redWork.getReducer();
-    reducer.getParentOperators().clear();
-    reducer.setParentOperators(null); // clear out any parents as reducer is the root
+    reducer.setParentOperators(null); // clear out any parents as reducer is the
+    // root
+    isTagged = redWork.getNeedsTagging();
+    vectorized = redWork.getVectorMode();
 
-    int numTags = redWork.getTagToValueDesc().size();
+    try {
+      keyTableDesc = redWork.getKeyDesc();
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+          .getDeserializerClass(), null);
+      SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
+      keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+      reducer.setGroupKeyObjectInspector(keyObjectInspector);
+      valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()];
+
+      if(vectorized) {
+        final int maxTags = redWork.getTagToValueDesc().size();
+        keyStructInspector = (StructObjectInspector)keyObjectInspector;
+        batches = new VectorizedRowBatch[maxTags];
+        valueStructInspectors = new StructObjectInspector[maxTags];
+        valueStringWriters = new List[maxTags];
+        keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+        buffer = new DataOutputBuffer();
+      }
 
-    ObjectInspector[] ois = new ObjectInspector[numTags];
-    sources = new ReduceRecordSource[numTags];
+      for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
+        // We should initialize the SerDe with the TypeInfo when available.
+        valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag);
+        inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+            valueTableDesc[tag].getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
+                                   valueTableDesc[tag].getProperties(), null);
+        valueObjectInspector[tag] = inputValueDeserializer[tag]
+            .getObjectInspector();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+
+        if(vectorized) {
+          /* vectorization only works with struct object inspectors */
+          valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag];
+
+          batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
+              valueStructInspectors[tag]);
+          final int totalColumns = keysColumnOffset +
+              valueStructInspectors[tag].getAllStructFieldRefs().size();
+          valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
+          valueStringWriters[tag].addAll(Arrays
+              .asList(VectorExpressionWriterFactory
+                  .genVectorStructExpressionWritables(keyStructInspector)));
+          valueStringWriters[tag].addAll(Arrays
+              .asList(VectorExpressionWriterFactory
+                  .genVectorStructExpressionWritables(valueStructInspectors[tag])));
+
+          /*
+           * The row object inspector used by ReduceWork needs to be a **standard**
+           * struct object inspector, not just any struct object inspector.
+           */
+          ArrayList<String> colNames = new ArrayList<String>();
+          List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
+          for (StructField field: fields) {
+            colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+            ois.add(field.getFieldObjectInspector());
+          }
+          fields = valueStructInspectors[tag].getAllStructFieldRefs();
+          for (StructField field: fields) {
+            colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+            ois.add(field.getFieldObjectInspector());
+          }
+          rowObjectInspector[tag] = ObjectInspectorFactory
+                  .getStandardStructObjectInspector(colNames, ois);
+        } else {
+          ois.add(keyObjectInspector);
+          ois.add(valueObjectInspector[tag]);
+          rowObjectInspector[tag] = ObjectInspectorFactory
+                  .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
+        }
 
-    for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
-      TableDesc keyTableDesc = redWork.getKeyDesc();
-      TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
-      KeyValuesReader reader =
-          (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader();
-
-      sources[tag] = new ReduceRecordSource();
-      sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
-          reader, tag == position, (byte) tag,
-          redWork.getScratchColumnVectorTypes());
-      ois[tag] = sources[tag].getObjectInspector();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
 
     MapredContext.init(false, new JobConf(jconf));
     ((TezContext) MapredContext.get()).setInputs(inputs);
     ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
-    ((TezContext) MapredContext.get()).setRecordSources(sources);
 
     // initialize reduce operator tree
     try {
       l4j.info(reducer.dump(0));
-      reducer.initialize(jconf, ois);
+      reducer.initialize(jconf, rowObjectInspector);
 
       // Initialization isn't finished until all parents of all operators
       // are initialized. For broadcast joins that means initializing the
@@ -121,6 +227,7 @@ public class ReduceRecordProcessor  exte
       List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
       if (dummyOps != null) {
         for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+          dummyOp.setExecContext(execContext);
           dummyOp.initialize(jconf, null);
         }
       }
@@ -164,12 +271,28 @@ public class ReduceRecordProcessor  exte
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
-    // run the operator pipeline
-    while (sources[position].pushRecord()) {
-      if (isLogInfoEnabled) {
-        logProgress();
+    KeyValuesReader kvsReader;
+    try {
+      if(shuffleInputs.size() == 1){
+        //no merging of inputs required
+        kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader();
+      }else {
+        //get a sort merged input
+        kvsReader = new InputMerger(shuffleInputs);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    while(kvsReader.next()){
+      Object key = kvsReader.getCurrentKey();
+      Iterable<Object> values = kvsReader.getCurrentValues();
+      boolean needMore = processRows(key, values);
+      if(!needMore){
+        break;
       }
     }
+
   }
 
   /**
@@ -179,22 +302,209 @@ public class ReduceRecordProcessor  exte
    */
   private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
     //the reduce plan inputs have tags, add all inputs that have tags
-    Map<Integer, String> tagToinput = redWork.getTagToInput();
+    Map<Integer, String> tag2input = redWork.getTagToInput();
     ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
-    for(String inpStr : tagToinput.values()){
-      if (inputs.get(inpStr) == null) {
-        throw new AssertionError("Cound not find input: " + inpStr);
-      }
+    for(String inpStr : tag2input.values()){
       shuffleInputs.add(inputs.get(inpStr));
     }
     return shuffleInputs;
   }
 
+  /**
+   * @param key
+   * @param values
+   * @return true if it is not done and can take more inputs
+   */
+  private boolean processRows(Object key, Iterable<Object> values) {
+    if(reducer.getDone()){
+      //done - no more records needed
+      return false;
+    }
+
+    // reset the execContext for each new row
+    execContext.resetRow();
+
+    try {
+      BytesWritable keyWritable = (BytesWritable) key;
+      byte tag = 0;
+
+      if (isTagged) {
+        // remove the tag from key coming out of reducer
+        // and store it in separate variable.
+        int size = keyWritable.getLength() - 1;
+        tag = keyWritable.getBytes()[size];
+        keyWritable.setSize(size);
+      }
+
+      //Set the key, check if this is a new group or same group
+      if (!keyWritable.equals(this.groupKey)) {
+        // If a operator wants to do some work at the beginning of a group
+        if (groupKey == null) { // the first group
+          this.groupKey = new BytesWritable();
+        } else {
+          // If a operator wants to do some work at the end of a group
+          if(isLogTraceEnabled) {
+            l4j.trace("End Group");
+          }
+          reducer.endGroup();
+        }
+
+        try {
+          this.keyObject = inputKeyDeserializer.deserialize(keyWritable);
+        } catch (Exception e) {
+          throw new HiveException(
+              "Hive Runtime Error: Unable to deserialize reduce input key from "
+              + Utilities.formatBinaryString(keyWritable.getBytes(), 0,
+              keyWritable.getLength()) + " with properties "
+              + keyTableDesc.getProperties(), e);
+        }
+        groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength());
+        if (isLogTraceEnabled) {
+          l4j.trace("Start Group");
+        }
+        reducer.setGroupKeyObject(keyObject);
+        reducer.startGroup();
+      }
+      /* this.keyObject passed via reference */
+      if(vectorized) {
+        return processVectors(values, tag);
+      } else {
+        return processKeyValues(values, tag);
+      }
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        l4j.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException {
+    try {
+      return inputValueDeserializer[tag].deserialize(valueWritable);
+    } catch (SerDeException e) {
+      throw new HiveException(
+          "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+              + tag
+              + ") from "
+              + Utilities.formatBinaryString(valueWritable.getBytes(), 0,
+                  valueWritable.getLength()) + " with properties "
+              + valueTableDesc[tag].getProperties(), e);
+    }
+  }
+
+  /**
+   * @param values
+   * @return true if it is not done and can take more inputs
+   */
+  private boolean processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
+
+    for (Object value : values) {
+      BytesWritable valueWritable = (BytesWritable) value;
+
+      row.clear();
+      row.add(this.keyObject);
+      row.add(deserializeValue(valueWritable, tag));
+
+      try {
+        reducer.processOp(row, tag);
+      } catch (Exception e) {
+        String rowString = null;
+        try {
+          rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
+        } catch (Exception e2) {
+          rowString = "[Error getting row data with exception "
+              + StringUtils.stringifyException(e2) + " ]";
+        }
+        throw new HiveException("Hive Runtime Error while processing row (tag="
+            + tag + ") " + rowString, e);
+      }
+      if (isLogInfoEnabled) {
+        logProgress();
+      }
+    }
+    return true; //give me more
+  }
+
+  /**
+   * @param values
+   * @return true if it is not done and can take more inputs
+   */
+  private boolean processVectors(Iterable<Object> values, byte tag) throws HiveException {
+    VectorizedRowBatch batch = batches[tag];
+    batch.reset();
+
+    /* deserialize key into columns */
+    VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
+        0, 0, batch, buffer);
+    for(int i = 0; i < keysColumnOffset; i++) {
+      VectorizedBatchUtil.setRepeatingColumn(batch, i);
+    }
+
+    int rowIdx = 0;
+    try {
+      for (Object value : values) {
+        /* deserialize value into columns */
+        BytesWritable valueWritable = (BytesWritable) value;
+        Object valueObj = deserializeValue(valueWritable, tag);
+
+        VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag],
+            rowIdx, keysColumnOffset, batch, buffer);
+        rowIdx++;
+        if (rowIdx >= BATCH_SIZE) {
+          VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+          reducer.processOp(batch, tag);
+          rowIdx = 0;
+          if (isLogInfoEnabled) {
+            logProgress();
+          }
+        }
+      }
+      if (rowIdx > 0) {
+        VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+        reducer.processOp(batch, tag);
+      }
+      if (isLogInfoEnabled) {
+        logProgress();
+      }
+    } catch (Exception e) {
+      String rowString = null;
+      try {
+        /* batch.toString depends on this */
+        batch.setValueWriters(valueStringWriters[tag]
+            .toArray(new VectorExpressionWriter[0]));
+        rowString = batch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+            + StringUtils.stringifyException(e2) + " ]";
+      }
+      throw new HiveException("Hive Runtime Error while processing vector batch (tag="
+          + tag + ") " + rowString, e);
+    }
+    return true; // give me more
+  }
+
   @Override
   void close(){
+    // check if there are IOExceptions
+    if (!abort) {
+      abort = execContext.getIoCxt().getIOExceptions();
+    }
+
     try {
-      for (ReduceRecordSource rs: sources) {
-        abort = abort && rs.close();
+      if (groupKey != null) {
+        // If a operator wants to do some work at the end of a group
+        if(isLogTraceEnabled) {
+          l4j.trace("End Group");
+        }
+        reducer.endGroup();
+      }
+      if (isLogInfoEnabled) {
+        logCloseInfo();
       }
 
       reducer.close(abort);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Mon Oct  6 03:44:13 2014
@@ -37,8 +37,6 @@ public class TezContext extends MapredCo
 
   private ProcessorContext processorContext;
 
-  private RecordSource[] sources;
-
   public TezContext(boolean isMap, JobConf jobConf) {
     super(isMap, jobConf);
   }
@@ -72,12 +70,4 @@ public class TezContext extends MapredCo
   public ProcessorContext getTezProcessorContext() {
     return processorContext;
   }
-
-  public RecordSource[] getRecordSources() {
-    return sources;
-  }
-
-  public void setRecordSources(RecordSource[] sources) {
-    this.sources = sources;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Oct  6 03:44:13 2014
@@ -78,7 +78,7 @@ public class TezJobMonitor {
         try {
           for (TezSessionState s: TezSessionState.getOpenSessions()) {
             System.err.println("Shutting down tez session.");
-            TezSessionPoolManager.getInstance().close(s, false);
+            TezSessionPoolManager.getInstance().close(s);
           }
         } catch (Exception e) {
           // ignore
@@ -113,7 +113,6 @@ public class TezJobMonitor {
     String lastReport = null;
     Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
     Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
-    long startTime = 0;
 
     shutdownList.add(dagClient);
 
@@ -146,7 +145,6 @@ public class TezJobMonitor {
               for (String s: progressMap.keySet()) {
                 perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
               }
-              startTime = System.currentTimeMillis();
               running = true;
             }
 
@@ -154,8 +152,7 @@ public class TezJobMonitor {
             break;
           case SUCCEEDED:
             lastReport = printStatus(progressMap, lastReport, console);
-            double duration = (System.currentTimeMillis() - startTime)/1000.0;
-            console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration));
+            console.printInfo("Status: Finished successfully");
             running = false;
             done = true;
             break;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Mon Oct  6 03:44:13 2014
@@ -17,14 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +26,6 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
@@ -43,6 +34,11 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -94,8 +90,7 @@ public class TezProcessor extends Abstra
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
     Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
-    this.processorContext = getContext();
-    setupMRLegacyConfigs(processorContext);
+    setupMRLegacyConfigs(getContext());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
   }
 
@@ -135,6 +130,12 @@ public class TezProcessor extends Abstra
 
       if (isMap) {
         rproc = new MapRecordProcessor(jobConf);
+        MRInputLegacy mrInput = getMRInput(inputs);
+        try {
+          mrInput.init();
+        } catch (IOException e) {
+          throw new RuntimeException("Failed while initializing MRInput", e);
+        }
       } else {
         rproc = new ReduceRecordProcessor();
       }
@@ -147,7 +148,6 @@ public class TezProcessor extends Abstra
       throws Exception {
     Throwable originalThrowable = null;
     try {
-      // Outputs will be started later by the individual Processors.
       TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
       // Start the actual Inputs. After MRInput initialization.
       for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
@@ -155,10 +155,13 @@ public class TezProcessor extends Abstra
           LOG.info("Input: " + inputEntry.getKey() + " is not cached");
           inputEntry.getValue().start();
         } else {
-          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+          LOG.info("Input: " + inputEntry.getKey() +
+              " is already cached. Skipping start");
         }
       }
 
+      // Outputs will be started later by the individual Processors.
+
       MRTaskReporter mrReporter = new MRTaskReporter(getContext());
       rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);
       rproc.run();
@@ -211,4 +214,19 @@ public class TezProcessor extends Abstra
       writer.write(key, value);
     }
   }
+
+  static  MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
+    //there should be only one MRInput
+    MRInputLegacy theMRInput = null;
+    for(LogicalInput inp : inputs.values()){
+      if(inp instanceof MRInputLegacy){
+        if(theMRInput != null){
+          throw new IllegalArgumentException("Only one MRInput is expected");
+        }
+        //a better logic would be to find the alias
+        theMRInput = (MRInputLegacy)inp;
+      }
+    }
+    return theMRInput;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Mon Oct  6 03:44:13 2014
@@ -168,10 +168,10 @@ public class TezSessionPoolManager {
     // session in the SessionState
   }
 
-  public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
+  public void close(TezSessionState tezSessionState) throws Exception {
     LOG.info("Closing tez session default? " + tezSessionState.isDefault());
     if (!tezSessionState.isDefault()) {
-      tezSessionState.close(keepTmpDir);
+      tezSessionState.close(false);
     }
   }
 
@@ -262,24 +262,19 @@ public class TezSessionPoolManager {
     }
 
     if (session != null) {
-      close(session, false);
+      close(session);
     }
 
     return getSession(conf, doOpen, forceCreate);
   }
 
-  public void closeAndOpen(TezSessionState sessionState, HiveConf conf, boolean keepTmpDir)
+  public void closeAndOpen(TezSessionState sessionState, HiveConf conf)
       throws Exception {
-    closeAndOpen(sessionState, conf, null, keepTmpDir);
-  }
-
-  public void closeAndOpen(TezSessionState sessionState, HiveConf conf,
-      String[] additionalFiles, boolean keepTmpDir) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
     if (sessionConf != null && sessionConf.get("tez.queue.name") != null) {
       conf.set("tez.queue.name", sessionConf.get("tez.queue.name"));
     }
-    close(sessionState, keepTmpDir);
-    sessionState.open(conf, additionalFiles);
+    close(sessionState);
+    sessionState.open(conf);
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Oct  6 03:44:13 2014
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -56,7 +55,6 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.SessionNotRunning;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -126,11 +124,14 @@ public class TezTask extends Task<TezWor
       // create the tez tmp dir
       scratchDir = utils.createTezDir(scratchDir, conf);
 
-      Map<String,LocalResource> inputOutputLocalResources =
-          getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
-
-      // Ensure the session is open and has the necessary local resources
-      updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
+      if (!session.isOpen()) {
+        // can happen if the user sets the tez flag after the session was
+        // established
+        LOG.info("Tez session hasn't been created yet. Opening session");
+        session.open(conf, inputOutputJars);
+      } else {
+        session.refreshLocalResourcesFromConf(conf);
+      }
 
       List<LocalResource> additionalLr = session.getLocalizedResources();
 
@@ -152,12 +153,8 @@ public class TezTask extends Task<TezWor
       // next we translate the TezWork to a Tez DAG
       DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
 
-      // Add the extra resources to the dag
-      addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
-
       // submit will send the job to the cluster and start executing
-      client = submit(jobConf, dag, scratchDir, appJarLr, session,
-          additionalLr, inputOutputJars, inputOutputLocalResources);
+      client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr);
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
@@ -198,63 +195,6 @@ public class TezTask extends Task<TezWor
     return rc;
   }
 
-  /**
-   * Converted the list of jars into local resources
-   */
-  Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
-      String[] inputOutputJars) throws Exception {
-    final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
-    final List<LocalResource> localResources = utils.localizeTempFiles(
-        scratchDir.toString(), jobConf, inputOutputJars);
-    if (null != localResources) {
-      for (LocalResource lr : localResources) {
-        resources.put(utils.getBaseName(lr), lr);
-      }
-    }
-    return resources;
-  }
-
-  /**
-   * Ensures that the Tez Session is open and the AM has all necessary jars configured.
-   */
-  void updateSession(TezSessionState session,
-      JobConf jobConf, Path scratchDir, String[] inputOutputJars,
-      Map<String,LocalResource> extraResources) throws Exception {
-    final boolean missingLocalResources = !session
-        .hasResources(inputOutputJars);
-
-    if (!session.isOpen()) {
-      // can happen if the user sets the tez flag after the session was
-      // established
-      LOG.info("Tez session hasn't been created yet. Opening session");
-      session.open(conf, inputOutputJars);
-    } else {
-      LOG.info("Session is already open");
-
-      // Ensure the open session has the necessary resources (StorageHandler)
-      if (missingLocalResources) {
-        LOG.info("Tez session missing resources," +
-            " adding additional necessary resources");
-        session.getSession().addAppMasterLocalFiles(extraResources);
-      }
-
-      session.refreshLocalResourcesFromConf(conf);
-    }
-  }
-
-  /**
-   * Adds any necessary resources that must be localized in each vertex to the DAG.
-   */
-  void addExtraResourcesToDag(TezSessionState session, DAG dag,
-      String[] inputOutputJars,
-      Map<String,LocalResource> inputOutputLocalResources) throws Exception {
-    if (!session.hasResources(inputOutputJars)) {
-      if (null != inputOutputLocalResources) {
-        dag.addTaskLocalFiles(inputOutputLocalResources);
-      }
-    }
-  }
-
   DAG build(JobConf conf, TezWork work, Path scratchDir,
       LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
       throws Exception {
@@ -314,16 +254,15 @@ public class TezTask extends Task<TezWor
         for (BaseWork v: children) {
           // finally we can create the grouped edge
           GroupInputEdge e = utils.createEdge(group, parentConf,
-               workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v));
+               workToVertex.get(v), work.getEdgeProperty(w, v));
 
           dag.addEdge(e);
         }
       } else {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
-        Vertex wx =
-            utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
-                work, work.getVertexType(w));
+        Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
+          additionalLr, fs, ctx, !isFinal, work);
         dag.addVertex(wx);
         utils.addCredentials(w, dag);
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
@@ -337,7 +276,7 @@ public class TezTask extends Task<TezWor
 
           TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
 
-          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v));
+          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp);
           dag.addEdge(e);
         }
       }
@@ -348,8 +287,7 @@ public class TezTask extends Task<TezWor
 
   DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
       LocalResource appJarLr, TezSessionState sessionState,
-      List<LocalResource> additionalLr, String[] inputOutputJars,
-      Map<String,LocalResource> inputOutputLocalResources)
+      List<LocalResource> additionalLr)
       throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
@@ -370,7 +308,7 @@ public class TezTask extends Task<TezWor
       console.printInfo("Tez session was closed. Reopening...");
 
       // close the old one, but keep the tmp files around
-      TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, true);
+      TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
       console.printInfo("Session re-established.");
 
       dagClient = sessionState.getSession().submitDAG(dag);
@@ -388,9 +326,6 @@ public class TezTask extends Task<TezWor
     try {
       List<BaseWork> ws = work.getAllWork();
       for (BaseWork w: ws) {
-        if (w instanceof MergeJoinWork) {
-          w = ((MergeJoinWork) w).getMainWork();
-        }
         for (Operator<?> op: w.getAllOperators()) {
           op.jobClose(conf, rc == 0);
         }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Mon Oct  6 03:44:13 2014
@@ -40,7 +40,7 @@ public class TezMergedLogicalInput exten
  
   @Override
   public Reader getReader() throws Exception {
-    return new KeyValuesInputMerger(getInputs());
+    return new InputMerger(getInputs());
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java Mon Oct  6 03:44:13 2014
@@ -67,7 +67,6 @@ public class VectorExpressionDescriptor 
     DATE                    (0x040),
     TIMESTAMP               (0x080),
     DATETIME_FAMILY         (DATE.value | TIMESTAMP.value),
-    INT_TIMESTAMP_FAMILY    (INT_FAMILY.value | TIMESTAMP.value),
     INT_DATETIME_FAMILY     (INT_FAMILY.value | DATETIME_FAMILY.value),
     STRING_DATETIME_FAMILY  (STRING_FAMILY.value | DATETIME_FAMILY.value),
     ALL_FAMILY              (0xFFF);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java Mon Oct  6 03:44:13 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -46,8 +45,7 @@ public class VectorExtractOperator exten
   private int keyColCount;
   private int valueColCount;
   
-  private transient VectorizedRowBatch outputBatch;
-  private transient int remainingColCount;
+  private transient int [] projectedColumns = null;
 
   public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
@@ -59,25 +57,26 @@ public class VectorExtractOperator exten
     super();
   }
 
-  @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0];
-    List<? extends StructField> fields = structInputObjInspector.getAllStructFieldRefs();
+  private StructObjectInspector makeStandardStructObjectInspector(StructObjectInspector structObjectInspector) {
+    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
     ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
     ArrayList<String> colNames = new ArrayList<String>();
-    for (int i = keyColCount; i < fields.size(); i++) {
-      StructField field = fields.get(i);
-      String fieldName = field.getFieldName();
-
-      // Remove "VALUE." prefix.
-      int dotIndex = fieldName.indexOf(".");
-      colNames.add(fieldName.substring(dotIndex + 1));
+    for (StructField field: fields) {
+      colNames.add(field.getFieldName());
       ois.add(field.getFieldObjectInspector());
     }
-    outputObjInspector = ObjectInspectorFactory
+    return ObjectInspectorFactory
               .getStandardStructObjectInspector(colNames, ois);
-    remainingColCount = fields.size() - keyColCount;
-    outputBatch =  new VectorizedRowBatch(remainingColCount);
+    }
+ 
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    outputObjInspector = inputObjInspectors[0];
+    LOG.info("VectorExtractOperator class of outputObjInspector is " + outputObjInspector.getClass().getName());
+    projectedColumns = new int [valueColCount];
+    for (int i = 0; i < valueColCount; i++) {
+      projectedColumns[i] = keyColCount + i;
+    }
     initializeChildren(hconf);
   }
 
@@ -87,16 +86,20 @@ public class VectorExtractOperator exten
   }
   
   @Override
-  // Remove the key columns and forward the values (and scratch columns).
+  // Evaluate vectorized batches of rows and forward them.
   public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch inputBatch = (VectorizedRowBatch) row;
-
-    // Copy references to the input columns array starting after the keys...
-    for (int i = 0; i < remainingColCount; i++) {
-      outputBatch.cols[i] = inputBatch.cols[keyColCount + i];
-    }
-    outputBatch.size = inputBatch.size;
+    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
 
-    forward(outputBatch, outputObjInspector);
+    // Project away the key columns...
+    int[] originalProjections = vrg.projectedColumns;
+    int originalProjectionSize = vrg.projectionSize;
+    vrg.projectionSize = valueColCount;
+    vrg.projectedColumns = this.projectedColumns;
+
+    forward(vrg, outputObjInspector);
+
+    // Revert the projected columns back, because vrg will be re-used.
+    vrg.projectionSize = originalProjectionSize;
+    vrg.projectedColumns = originalProjections;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Mon Oct  6 03:44:13 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -25,7 +27,16 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 
 /**
  * File Sink operator implementation.
@@ -58,10 +69,113 @@ public class VectorFileSinkOperator exte
 
   @Override
   public void processOp(Object data, int tag) throws HiveException {
+
     VectorizedRowBatch vrg = (VectorizedRowBatch)data;
+
+    Writable [] records = null;
+    boolean vectorizedSerde = false;
+    try {
+      if (serializer instanceof VectorizedSerde) {
+        recordValue = ((VectorizedSerde) serializer).serializeVector(vrg,
+            inputObjInspectors[0]);
+        records = (Writable[]) ((ObjectWritable) recordValue).get();
+        vectorizedSerde = true;
+      }
+    } catch (SerDeException e1) {
+      throw new HiveException(e1);
+    }
+
     for (int i = 0; i < vrg.size; i++) {
-      Object[] row = getRowObject(vrg, i);
-      super.processOp(row, tag);
+      Writable row = null;
+      if (vectorizedSerde) {
+        row = records[i];
+      } else {
+        if (vrg.valueWriters == null) {
+          vrg.setValueWriters(this.valueWriters);
+        }
+        try {
+          row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]);
+        } catch (SerDeException ex) {
+          throw new HiveException(ex);
+        }
+      }
+    /* Create list bucketing sub-directory only if stored-as-directories is on. */
+    String lbDirName = null;
+    lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
+
+    FSPaths fpaths;
+
+    if (!bDynParts && !filesCreated) {
+      if (lbDirName != null) {
+        FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
+      } else {
+        createBucketFiles(fsp);
+      }
+    }
+
+    try {
+      updateProgress();
+
+      // if DP is enabled, get the final output writers and prepare the real output row
+      assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct";
+
+      if (bDynParts) {
+        // copy the DP column values from the input row to dpVals
+        dpVals.clear();
+        dpWritables.clear();
+        ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts,
+            (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+        // get a set of RecordWriter based on the DP column values
+        // pass the null value along to the escaping process to determine what the dir should be
+        for (Object o : dpWritables) {
+          if (o == null || o.toString().length() == 0) {
+            dpVals.add(dpCtx.getDefaultPartitionName());
+          } else {
+            dpVals.add(o.toString());
+          }
+        }
+        fpaths = getDynOutPaths(dpVals, lbDirName);
+
+      } else {
+        if (lbDirName != null) {
+          fpaths = lookupListBucketingPaths(lbDirName);
+        } else {
+          fpaths = fsp;
+        }
+      }
+
+      rowOutWriters = fpaths.getOutWriters();
+      // check if all record writers implement statistics. if atleast one RW
+      // doesn't implement stats interface we will fallback to conventional way
+      // of gathering stats
+      isCollectRWStats = areAllTrue(statsFromRecordWriter);
+      if (conf.isGatherStats() && !isCollectRWStats) {
+        if (statsCollectRawDataSize) {
+          SerDeStats stats = serializer.getSerDeStats();
+          if (stats != null) {
+            fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+          }
+        }
+        fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1);
+      }
+
+
+      if (row_count != null) {
+        row_count.set(row_count.get() + 1);
+      }
+
+      if (!multiFileSpray) {
+        rowOutWriters[0].write(row);
+      } else {
+        int keyHashCode = 0;
+        key.setHashCode(keyHashCode);
+        int bucketNum = prtner.getBucket(key, null, totalFiles);
+        int idx = bucketMap.get(bucketNum);
+        rowOutWriters[idx].write(row);
+      }
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
     }
   }
 
@@ -73,7 +187,7 @@ public class VectorFileSinkOperator exte
     }
     for (int i = 0; i < vrg.projectionSize; i++) {
       ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
-      singleRow[i] = valueWriters[i].writeValue(vectorColumn, batchIndex);
+      singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, batchIndex);
     }
     return singleRow;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Mon Oct  6 03:44:13 2014
@@ -1889,47 +1889,47 @@ public class VectorizationContext {
   // TODO:   And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used..  Right now they are conservatively
   //         marked map-side (HASH).
   static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,                          VectorUDAFMinLong.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFMinDouble.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,                          VectorUDAFMinString.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFMinDecimal.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,                          VectorUDAFMaxLong.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFMaxDouble.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,                          VectorUDAFMaxString.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFMaxDecimal.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,                   GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             null,                          VectorUDAFSumLong.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFSumDouble.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFSumDecimal.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMinLong.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMinDouble.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMinString.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMinDecimal.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMaxLong.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMaxDouble.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMaxString.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMaxDecimal.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,          GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFSumLong.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFSumDouble.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFSumDecimal.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
   }};
 
   public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce)

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Mon Oct  6 03:44:13 2014
@@ -140,20 +140,6 @@ public class VectorizedRowBatchCtx {
   
 
   /**
-   * Initializes the VectorizedRowBatch context based on an scratch column type map and
-   * object inspector.
-   * @param columnTypeMap
-   * @param rowOI
-   *          Object inspector that shapes the column types
-   */
-  public void init(Map<Integer, String> columnTypeMap,
-      StructObjectInspector rowOI) {
-    this.columnTypeMap = columnTypeMap;
-    this.rowOI= rowOI;
-    this.rawRowOI = rowOI;
-  }
-
-  /**
    * Initializes VectorizedRowBatch context based on the
    * split and Hive configuration (Job conf with hive Plan).
    *

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Oct  6 03:44:13 2014
@@ -40,8 +40,6 @@ import java.util.regex.Pattern;
  * are used by the compactor and cleaner and thus must be format agnostic.
  */
 public class AcidUtils {
-  // This key will be put in the conf file when planning an acid operation
-  public static final String CONF_ACID_KEY = "hive.doing.acid";
   public static final String BASE_PREFIX = "base_";
   public static final String DELTA_PREFIX = "delta_";
   public static final PathFilter deltaFileFilter = new PathFilter() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Mon Oct  6 03:44:13 2014
@@ -161,11 +161,10 @@ public abstract class HiveContextAwareRe
   }
 
   public IOContext getIOContext() {
-    return IOContext.get(jobConf.get(Utilities.INPUT_NAME));
+    return IOContext.get();
   }
 
-  private void initIOContext(long startPos, boolean isBlockPointer,
-      Path inputPath) {
+  public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
     ioCxtRef = this.getIOContext();
     ioCxtRef.currentBlockStart = startPos;
     ioCxtRef.isBlockPointer = isBlockPointer;
@@ -184,7 +183,7 @@ public abstract class HiveContextAwareRe
 
     boolean blockPointer = false;
     long blockStart = -1;
-    FileSplit fileSplit = split;
+    FileSplit fileSplit = (FileSplit) split;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(job);
     if (inputFormatClass.getName().contains("SequenceFile")) {
@@ -203,15 +202,12 @@ public abstract class HiveContextAwareRe
       blockStart = in.getPosition();
       in.close();
     }
-    this.jobConf = job;
     this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
 
     this.initIOContextSortedProps(split, recordReader, job);
   }
 
   public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) {
-    this.jobConf = job;
-
     this.getIOContext().resetSortingValues();
     this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Mon Oct  6 03:44:13 2014
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -254,14 +253,7 @@ public class HiveInputFormat<K extends W
   }
 
   protected void init(JobConf job) {
-    if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
-      mrwork = (MapWork) Utilities.getMergeWork(job);
-      if (mrwork == null) {
-        mrwork = Utilities.getMapWork(job);
-      }
-    } else {
-      mrwork = Utilities.getMapWork(job);
-    }
+    mrwork = Utilities.getMapWork(job);
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 
@@ -428,9 +420,6 @@ public class HiveInputFormat<K extends W
 
   public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
 
-    // ensure filters are not set from previous pushFilters
-    jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
-    jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
     TableScanDesc scanDesc = tableScan.getConf();
     if (scanDesc == null) {
       return;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Mon Oct  6 03:44:13 2014
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 
 
@@ -34,6 +31,7 @@ import org.apache.hadoop.fs.Path;
  */
 public class IOContext {
 
+
   private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
     @Override
     protected synchronized IOContext initialValue() { return new IOContext(); }
@@ -43,21 +41,6 @@ public class IOContext {
     return IOContext.threadLocal.get();
   }
 
-  private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
-  public static Map<String, IOContext> getMap() {
-    return inputNameIOContextMap;
-  }
-
-  public static IOContext get(String inputName) {
-    if (inputNameIOContextMap.containsKey(inputName) == false) {
-      IOContext ioContext = new IOContext();
-      inputNameIOContextMap.put(inputName, ioContext);
-    }
-
-    return inputNameIOContextMap.get(inputName);
-  }
-
   public static void clear() {
     IOContext.threadLocal.remove();
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Oct  6 03:44:13 2014
@@ -132,7 +132,7 @@ public class OrcInputFormat  implements 
   @Override
   public boolean shouldSkipCombine(Path path,
                                    Configuration conf) throws IOException {
-    return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf);
+    return AcidUtils.isAcid(path, conf);
   }
 
   private static class OrcRecordReader

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Mon Oct  6 03:44:13 2014
@@ -118,11 +118,13 @@ public class OrcNewInputFormat extends I
   public List<InputSplit> getSplits(JobContext jobContext)
       throws IOException, InterruptedException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
+    Configuration conf =
+        ShimLoader.getHadoopShims().getConfiguration(jobContext);
     List<OrcSplit> splits =
         OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
         .getConfiguration(jobContext));
-    List<InputSplit> result = new ArrayList<InputSplit>(splits.size());
-    for(OrcSplit split: splits) {
+    List<InputSplit> result = new ArrayList<InputSplit>();
+    for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) {
       result.add(new OrcNewSplit(split));
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Mon Oct  6 03:44:13 2014
@@ -418,120 +418,138 @@ class RunLengthIntegerWriterV2 implement
 
   private void determineEncoding() {
 
-    // we need to compute zigzag values for DIRECT encoding if we decide to
-    // break early for delta overflows or for shorter runs
-    computeZigZagLiterals();
+    int idx = 0;
 
-    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
+    // for identifying monotonic sequences
+    boolean isIncreasing = false;
+    int increasingCount = 1;
+    boolean isDecreasing = false;
+    int decreasingCount = 1;
 
-    // not a big win for shorter runs to determine encoding
-    if (numLiterals <= MIN_REPEAT) {
-      encoding = EncodingType.DIRECT;
-      return;
-    }
+    // for identifying type of delta encoding
+    min = literals[0];
+    long max = literals[0];
+    isFixedDelta = true;
+    long currDelta = 0;
 
-    // DELTA encoding check
+    min = literals[0];
+    long deltaMax = 0;
 
-    // for identifying monotonic sequences
-    boolean isIncreasing = true;
-    boolean isDecreasing = true;
-    this.isFixedDelta = true;
+    // populate all variables to identify the encoding type
+    if (numLiterals >= 1) {
+      currDelta = literals[1] - literals[0];
+      for(int i = 0; i < numLiterals; i++) {
+        if (i > 0 && literals[i] >= max) {
+          max = literals[i];
+          increasingCount++;
+        }
 
-    this.min = literals[0];
-    long max = literals[0];
-    final long initialDelta = literals[1] - literals[0];
-    long currDelta = initialDelta;
-    long deltaMax = initialDelta;
-    this.adjDeltas[0] = initialDelta;
-
-    for (int i = 1; i < numLiterals; i++) {
-      final long l1 = literals[i];
-      final long l0 = literals[i - 1];
-      currDelta = l1 - l0;
-      min = Math.min(min, l1);
-      max = Math.max(max, l1);
-
-      isIncreasing &= (l0 <= l1);
-      isDecreasing &= (l0 >= l1);
-
-      isFixedDelta &= (currDelta == initialDelta);
-      if (i > 1) {
-        adjDeltas[i - 1] = Math.abs(currDelta);
-        deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
-      }
-    }
+        if (i > 0 && literals[i] <= min) {
+          min = literals[i];
+          decreasingCount++;
+        }
 
-    // its faster to exit under delta overflow condition without checking for
-    // PATCHED_BASE condition as encoding using DIRECT is faster and has less
-    // overhead than PATCHED_BASE
-    if (!utils.isSafeSubtract(max, min)) {
-      encoding = EncodingType.DIRECT;
-      return;
-    }
+        // if delta doesn't changes then mark it as fixed delta
+        if (i > 0 && isFixedDelta) {
+          if (literals[i] - literals[i - 1] != currDelta) {
+            isFixedDelta = false;
+          }
 
-    // invariant - subtracting any number from any other in the literals after
-    // this point won't overflow
+          fixedDelta = currDelta;
+        }
 
-    // if initialDelta is 0 then we cannot delta encode as we cannot identify
-    // the sign of deltas (increasing or decreasing)
-    if (initialDelta != 0) {
-
-      // if min is equal to max then the delta is 0, this condition happens for
-      // fixed values run >10 which cannot be encoded with SHORT_REPEAT
-      if (min == max) {
-        assert isFixedDelta : min + "==" + max +
-            ", isFixedDelta cannot be false";
-        assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
-        fixedDelta = 0;
-        encoding = EncodingType.DELTA;
-        return;
-      }
+        // populate zigzag encoded literals
+        long zzEncVal = 0;
+        if (signed) {
+          zzEncVal = utils.zigzagEncode(literals[i]);
+        } else {
+          zzEncVal = literals[i];
+        }
+        zigzagLiterals[idx] = zzEncVal;
+        idx++;
 
-      if (isFixedDelta) {
-        assert currDelta == initialDelta
-            : "currDelta should be equal to initialDelta for fixed delta encoding";
-        encoding = EncodingType.DELTA;
-        fixedDelta = currDelta;
-        return;
+        // max delta value is required for computing the fixed bits
+        // required for delta blob in delta encoding
+        if (i > 0) {
+          if (i == 1) {
+            // first value preserve the sign
+            adjDeltas[i - 1] = literals[i] - literals[i - 1];
+          } else {
+            adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
+            if (adjDeltas[i - 1] > deltaMax) {
+              deltaMax = adjDeltas[i - 1];
+            }
+          }
+        }
       }
 
       // stores the number of bits required for packing delta blob in
       // delta encoding
       bitsDeltaMax = utils.findClosestNumBits(deltaMax);
 
-      // monotonic condition
-      if (isIncreasing || isDecreasing) {
-        encoding = EncodingType.DELTA;
-        return;
+      // if decreasing count equals total number of literals then the
+      // sequence is monotonically decreasing
+      if (increasingCount == 1 && decreasingCount == numLiterals) {
+        isDecreasing = true;
+      }
+
+      // if increasing count equals total number of literals then the
+      // sequence is monotonically increasing
+      if (decreasingCount == 1 && increasingCount == numLiterals) {
+        isIncreasing = true;
       }
     }
 
-    // PATCHED_BASE encoding check
+    // if the sequence is both increasing and decreasing then it is not
+    // monotonic
+    if (isDecreasing && isIncreasing) {
+      isDecreasing = false;
+      isIncreasing = false;
+    }
+
+    // fixed delta condition
+    if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
+      encoding = EncodingType.DELTA;
+      return;
+    }
+
+    // monotonic condition
+    if (isIncreasing || isDecreasing) {
+      encoding = EncodingType.DELTA;
+      return;
+    }
 
     // percentile values are computed for the zigzag encoded values. if the
     // number of bit requirement between 90th and 100th percentile varies
     // beyond a threshold then we need to patch the values. if the variation
-    // is not significant then we can use direct encoding
+    // is not significant then we can use direct or delta encoding
+
+    double p = 0.9;
+    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p);
+
+    p = 1.0;
+    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p);
 
-    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
     int diffBitsLH = zzBits100p - zzBits90p;
 
     // if the difference between 90th percentile and 100th percentile fixed
     // bits is > 1 then we need patch the values
-    if (diffBitsLH > 1) {
-
+    if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+        && isFixedDelta == false) {
       // patching is done only on base reduced values.
       // remove base from literals
-      for (int i = 0; i < numLiterals; i++) {
+      for(int i = 0; i < numLiterals; i++) {
         baseRedLiterals[i] = literals[i] - min;
       }
 
       // 95th percentile width is used to determine max allowed value
       // after which patching will be done
-      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
+      p = 0.95;
+      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p);
 
       // 100th percentile is used to compute the max patch width
-      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
+      p = 1.0;
+      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p);
 
       // after base reducing the values, if the difference in bits between
       // 95th percentile and 100th percentile value is zero then there
@@ -547,24 +565,19 @@ class RunLengthIntegerWriterV2 implement
         encoding = EncodingType.DIRECT;
         return;
       }
-    } else {
-      // if difference in bits between 95th percentile and 100th percentile is
-      // 0, then patch length will become 0. Hence we will fallback to direct
+    }
+
+    // if difference in bits between 95th percentile and 100th percentile is
+    // 0, then patch length will become 0. Hence we will fallback to direct
+    if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+        && isFixedDelta == false) {
       encoding = EncodingType.DIRECT;
       return;
     }
-  }
 
-  private void computeZigZagLiterals() {
-    // populate zigzag encoded literals
-    long zzEncVal = 0;
-    for (int i = 0; i < numLiterals; i++) {
-      if (signed) {
-        zzEncVal = utils.zigzagEncode(literals[i]);
-      } else {
-        zzEncVal = literals[i];
-      }
-      zigzagLiterals[i] = zzEncVal;
+    // this should not happen
+    if (encoding == null) {
+      throw new RuntimeException("Integer encoding cannot be determined.");
     }
   }
 
@@ -687,7 +700,7 @@ class RunLengthIntegerWriterV2 implement
     patchWidth = 0;
     gapVsPatchList = null;
     min = 0;
-    isFixedDelta = true;
+    isFixedDelta = false;
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Mon Oct  6 03:44:13 2014
@@ -1283,9 +1283,4 @@ final class SerializationUtils {
         + ((readBuffer[rbOffset + 7] & 255) << 0));
   }
 
-  // Do not want to use Guava LongMath.checkedSubtract() here as it will throw
-  // ArithmeticException in case of overflow
-  public boolean isSafeSubtract(long left, long right) {
-    return (left ^ right) >= 0 | (left ^ (left - right)) >= 0;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java Mon Oct  6 03:44:13 2014
@@ -13,6 +13,9 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.convert;
 
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -30,7 +33,7 @@ public class ArrayWritableGroupConverter
   private Writable[] mapPairContainer;
 
   public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent,
-      final int index) {
+      final int index, List<TypeInfo> hiveSchemaTypeInfos) {
     this.parent = parent;
     this.index = index;
     int count = groupType.getFieldCount();
@@ -40,7 +43,8 @@ public class ArrayWritableGroupConverter
     isMap = count == 2;
     converters = new Converter[count];
     for (int i = 0; i < count; i++) {
-      converters[i] = getConverterFromDescription(groupType.getType(i), i, this);
+      converters[i] = getConverterFromDescription(groupType.getType(i), i, this,
+          hiveSchemaTypeInfos);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java Mon Oct  6 03:44:13 2014
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -36,19 +37,21 @@ public class DataWritableGroupConverter 
   private final Object[] currentArr;
   private Writable[] rootMap;
 
-  public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) {
-    this(requestedSchema, null, 0, tableSchema);
+  public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema,
+      final List<TypeInfo> hiveSchemaTypeInfos) {
+    this(requestedSchema, null, 0, tableSchema, hiveSchemaTypeInfos);
     final int fieldCount = tableSchema.getFieldCount();
     this.rootMap = new Writable[fieldCount];
   }
 
   public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent,
-      final int index) {
-    this(groupType, parent, index, groupType);
+      final int index, final List<TypeInfo> hiveSchemaTypeInfos) {
+    this(groupType, parent, index, groupType, hiveSchemaTypeInfos);
   }
 
   public DataWritableGroupConverter(final GroupType selectedGroupType,
-      final HiveGroupConverter parent, final int index, final GroupType containingGroupType) {
+      final HiveGroupConverter parent, final int index, final GroupType containingGroupType,
+      final List<TypeInfo> hiveSchemaTypeInfos) {
     this.parent = parent;
     this.index = index;
     final int totalFieldCount = containingGroupType.getFieldCount();
@@ -62,7 +65,8 @@ public class DataWritableGroupConverter 
       Type subtype = selectedFields.get(i);
       if (containingGroupType.getFields().contains(subtype)) {
         converters[i] = getConverterFromDescription(subtype,
-            containingGroupType.getFieldIndex(subtype.getName()), this);
+            containingGroupType.getFieldIndex(subtype.getName()), this,
+            hiveSchemaTypeInfos);
       } else {
         throw new IllegalStateException("Group type [" + containingGroupType +
             "] does not contain requested field: " + subtype);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java Mon Oct  6 03:44:13 2014
@@ -31,8 +31,10 @@ public class DataWritableRecordConverter
 
   private final DataWritableGroupConverter root;
 
-  public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) {
-    this.root = new DataWritableGroupConverter(requestedSchema, tableSchema);
+  public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema,
+      final List<TypeInfo> hiveColumnTypeInfos) {
+    this.root = new DataWritableGroupConverter(requestedSchema, tableSchema,
+        hiveColumnTypeInfos);
   }
 
   @Override



Mime
View raw message