Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 521D017B5A for ; Mon, 6 Oct 2014 03:45:23 +0000 (UTC) Received: (qmail 94827 invoked by uid 500); 6 Oct 2014 03:45:23 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 94597 invoked by uid 500); 6 Oct 2014 03:45:23 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 94576 invoked by uid 99); 6 Oct 2014 03:45:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Oct 2014 03:45:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Oct 2014 03:45:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 733742388B75; Mon, 6 Oct 2014 03:44:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1629562 [5/38] - in /hive/branches/spark: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ contrib/src... Date: Mon, 06 Oct 2014 03:44:26 -0000 To: commits@hive.apache.org From: brock@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20141006034457.733742388B75@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Oct 6 03:44:13 2014 @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -33,13 +35,31 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; +import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; @@ -56,16 +76,39 @@ public class ReduceRecordProcessor exte private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private final ExecMapperContext execContext = new ExecMapperContext(); + private boolean abort = false; + private Deserializer inputKeyDeserializer; + + // Input value serde needs to be an array to support different SerDe + // for different tags + private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; - private ReduceWork redWork; + TableDesc keyTableDesc; + TableDesc[] valueTableDesc; + ObjectInspector[] rowObjectInspector; private Operator reducer; + private boolean isTagged = false; + + private Object keyObject = null; + private BytesWritable groupKey; + + private ReduceWork redWork; - private ReduceRecordSource[] sources; + private boolean vectorized = false; - private final byte position = 0; + List row = new ArrayList(Utilities.reduceFieldNameList.size()); - private boolean abort; + private DataOutputBuffer buffer; + private VectorizedRowBatch[] batches; + // number of columns pertaining to keys in a vectorized row batch + private int keysColumnOffset; + private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; + private StructObjectInspector keyStructInspector; + private StructObjectInspector[] valueStructInspectors; + /* this is only used in the error code path */ + private List[] valueStringWriters; @Override void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, @@ -75,6 +118,10 @@ public class ReduceRecordProcessor exte ObjectCache cache = ObjectCacheFactory.getCache(jconf); + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector keyObjectInspector; + redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); if (redWork == null) { redWork = Utilities.getReduceWork(jconf); @@ -84,36 +131,95 @@ public class ReduceRecordProcessor exte } reducer = redWork.getReducer(); - reducer.getParentOperators().clear(); - reducer.setParentOperators(null); // clear out any parents as reducer is the root + reducer.setParentOperators(null); // clear out any parents as reducer is the + // root + isTagged = redWork.getNeedsTagging(); + vectorized = redWork.getVectorMode(); - int numTags = redWork.getTagToValueDesc().size(); + try { + keyTableDesc = redWork.getKeyDesc(); + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); + keyObjectInspector = inputKeyDeserializer.getObjectInspector(); + reducer.setGroupKeyObjectInspector(keyObjectInspector); + valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()]; + + if(vectorized) { + final int maxTags = redWork.getTagToValueDesc().size(); + keyStructInspector = (StructObjectInspector)keyObjectInspector; + batches = new VectorizedRowBatch[maxTags]; + valueStructInspectors = new StructObjectInspector[maxTags]; + valueStringWriters = new List[maxTags]; + keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); + buffer = new DataOutputBuffer(); + } - ObjectInspector[] ois = new ObjectInspector[numTags]; - sources = new ReduceRecordSource[numTags]; + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + // We should initialize the SerDe with the TypeInfo when available. + valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag); + inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance( + valueTableDesc[tag].getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, + valueTableDesc[tag].getProperties(), null); + valueObjectInspector[tag] = inputValueDeserializer[tag] + .getObjectInspector(); + + ArrayList ois = new ArrayList(); + + if(vectorized) { + /* vectorization only works with struct object inspectors */ + valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag]; + + batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, + valueStructInspectors[tag]); + final int totalColumns = keysColumnOffset + + valueStructInspectors[tag].getAllStructFieldRefs().size(); + valueStringWriters[tag] = new ArrayList(totalColumns); + valueStringWriters[tag].addAll(Arrays + .asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(keyStructInspector))); + valueStringWriters[tag].addAll(Arrays + .asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(valueStructInspectors[tag]))); + + /* + * The row object inspector used by ReduceWork needs to be a **standard** + * struct object inspector, not just any struct object inspector. + */ + ArrayList colNames = new ArrayList(); + List fields = keyStructInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueStructInspectors[tag].getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + rowObjectInspector[tag] = ObjectInspectorFactory + .getStandardStructObjectInspector(colNames, ois); + } else { + ois.add(keyObjectInspector); + ois.add(valueObjectInspector[tag]); + rowObjectInspector[tag] = ObjectInspectorFactory + .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); + } - for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { - TableDesc keyTableDesc = redWork.getKeyDesc(); - TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag); - KeyValuesReader reader = - (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader(); - - sources[tag] = new ReduceRecordSource(); - sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc, - reader, tag == position, (byte) tag, - redWork.getScratchColumnVectorTypes()); - ois[tag] = sources[tag].getObjectInspector(); + } + } catch (Exception e) { + throw new RuntimeException(e); } MapredContext.init(false, new JobConf(jconf)); ((TezContext) MapredContext.get()).setInputs(inputs); ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); - ((TezContext) MapredContext.get()).setRecordSources(sources); // initialize reduce operator tree try { l4j.info(reducer.dump(0)); - reducer.initialize(jconf, ois); + reducer.initialize(jconf, rowObjectInspector); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the @@ -121,6 +227,7 @@ public class ReduceRecordProcessor exte List dummyOps = redWork.getDummyOps(); if (dummyOps != null) { for (Operator dummyOp : dummyOps){ + dummyOp.setExecContext(execContext); dummyOp.initialize(jconf, null); } } @@ -164,12 +271,28 @@ public class ReduceRecordProcessor exte ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } - // run the operator pipeline - while (sources[position].pushRecord()) { - if (isLogInfoEnabled) { - logProgress(); + KeyValuesReader kvsReader; + try { + if(shuffleInputs.size() == 1){ + //no merging of inputs required + kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader(); + }else { + //get a sort merged input + kvsReader = new InputMerger(shuffleInputs); + } + } catch (Exception e) { + throw new IOException(e); + } + + while(kvsReader.next()){ + Object key = kvsReader.getCurrentKey(); + Iterable values = kvsReader.getCurrentValues(); + boolean needMore = processRows(key, values); + if(!needMore){ + break; } } + } /** @@ -179,22 +302,209 @@ public class ReduceRecordProcessor exte */ private List getShuffleInputs(Map inputs) { //the reduce plan inputs have tags, add all inputs that have tags - Map tagToinput = redWork.getTagToInput(); + Map tag2input = redWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); - for(String inpStr : tagToinput.values()){ - if (inputs.get(inpStr) == null) { - throw new AssertionError("Cound not find input: " + inpStr); - } + for(String inpStr : tag2input.values()){ shuffleInputs.add(inputs.get(inpStr)); } return shuffleInputs; } + /** + * @param key + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processRows(Object key, Iterable values) { + if(reducer.getDone()){ + //done - no more records needed + return false; + } + + // reset the execContext for each new row + execContext.resetRow(); + + try { + BytesWritable keyWritable = (BytesWritable) key; + byte tag = 0; + + if (isTagged) { + // remove the tag from key coming out of reducer + // and store it in separate variable. + int size = keyWritable.getLength() - 1; + tag = keyWritable.getBytes()[size]; + keyWritable.setSize(size); + } + + //Set the key, check if this is a new group or same group + if (!keyWritable.equals(this.groupKey)) { + // If a operator wants to do some work at the beginning of a group + if (groupKey == null) { // the first group + this.groupKey = new BytesWritable(); + } else { + // If a operator wants to do some work at the end of a group + if(isLogTraceEnabled) { + l4j.trace("End Group"); + } + reducer.endGroup(); + } + + try { + this.keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.getBytes(), 0, + keyWritable.getLength()) + " with properties " + + keyTableDesc.getProperties(), e); + } + groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + if (isLogTraceEnabled) { + l4j.trace("Start Group"); + } + reducer.setGroupKeyObject(keyObject); + reducer.startGroup(); + } + /* this.keyObject passed via reference */ + if(vectorized) { + return processVectors(values, tag); + } else { + return processKeyValues(values, tag); + } + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + } + + private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { + try { + return inputValueDeserializer[tag].deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.getBytes(), 0, + valueWritable.getLength()) + " with properties " + + valueTableDesc[tag].getProperties(), e); + } + } + + /** + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processKeyValues(Iterable values, byte tag) throws HiveException { + + for (Object value : values) { + BytesWritable valueWritable = (BytesWritable) value; + + row.clear(); + row.add(this.keyObject); + row.add(deserializeValue(valueWritable, tag)); + + try { + reducer.processOp(row, tag); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); + } + if (isLogInfoEnabled) { + logProgress(); + } + } + return true; //give me more + } + + /** + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processVectors(Iterable values, byte tag) throws HiveException { + VectorizedRowBatch batch = batches[tag]; + batch.reset(); + + /* deserialize key into columns */ + VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector, + 0, 0, batch, buffer); + for(int i = 0; i < keysColumnOffset; i++) { + VectorizedBatchUtil.setRepeatingColumn(batch, i); + } + + int rowIdx = 0; + try { + for (Object value : values) { + /* deserialize value into columns */ + BytesWritable valueWritable = (BytesWritable) value; + Object valueObj = deserializeValue(valueWritable, tag); + + VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], + rowIdx, keysColumnOffset, batch, buffer); + rowIdx++; + if (rowIdx >= BATCH_SIZE) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + rowIdx = 0; + if (isLogInfoEnabled) { + logProgress(); + } + } + } + if (rowIdx > 0) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + } + if (isLogInfoEnabled) { + logProgress(); + } + } catch (Exception e) { + String rowString = null; + try { + /* batch.toString depends on this */ + batch.setValueWriters(valueStringWriters[tag] + .toArray(new VectorExpressionWriter[0])); + rowString = batch.toString(); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + + tag + ") " + rowString, e); + } + return true; // give me more + } + @Override void close(){ + // check if there are IOExceptions + if (!abort) { + abort = execContext.getIoCxt().getIOExceptions(); + } + try { - for (ReduceRecordSource rs: sources) { - abort = abort && rs.close(); + if (groupKey != null) { + // If a operator wants to do some work at the end of a group + if(isLogTraceEnabled) { + l4j.trace("End Group"); + } + reducer.endGroup(); + } + if (isLogInfoEnabled) { + logCloseInfo(); } reducer.close(abort); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Mon Oct 6 03:44:13 2014 @@ -37,8 +37,6 @@ public class TezContext extends MapredCo private ProcessorContext processorContext; - private RecordSource[] sources; - public TezContext(boolean isMap, JobConf jobConf) { super(isMap, jobConf); } @@ -72,12 +70,4 @@ public class TezContext extends MapredCo public ProcessorContext getTezProcessorContext() { return processorContext; } - - public RecordSource[] getRecordSources() { - return sources; - } - - public void setRecordSources(RecordSource[] sources) { - this.sources = sources; - } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Oct 6 03:44:13 2014 @@ -78,7 +78,7 @@ public class TezJobMonitor { try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().close(s, false); + TezSessionPoolManager.getInstance().close(s); } } catch (Exception e) { // ignore @@ -113,7 +113,6 @@ public class TezJobMonitor { String lastReport = null; Set opts = new HashSet(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); - long startTime = 0; shutdownList.add(dagClient); @@ -146,7 +145,6 @@ public class TezJobMonitor { for (String s: progressMap.keySet()) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } - startTime = System.currentTimeMillis(); running = true; } @@ -154,8 +152,7 @@ public class TezJobMonitor { break; case SUCCEEDED: lastReport = printStatus(progressMap, lastReport, console); - double duration = (System.currentTimeMillis() - startTime)/1000.0; - console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration)); + console.printInfo("Status: Finished successfully"); running = false; done = true; break; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Mon Oct 6 03:44:13 2014 @@ -17,14 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; -import java.text.NumberFormat; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,7 +26,6 @@ import org.apache.hadoop.mapred.OutputCo import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; -import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -43,6 +34,11 @@ import org.apache.tez.runtime.api.Logica import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; +import java.io.IOException; +import java.text.NumberFormat; +import java.util.List; +import java.util.Map; + /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. @@ -94,8 +90,7 @@ public class TezProcessor extends Abstra perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); this.jobConf = new JobConf(conf); - this.processorContext = getContext(); - setupMRLegacyConfigs(processorContext); + setupMRLegacyConfigs(getContext()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @@ -135,6 +130,12 @@ public class TezProcessor extends Abstra if (isMap) { rproc = new MapRecordProcessor(jobConf); + MRInputLegacy mrInput = getMRInput(inputs); + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } } else { rproc = new ReduceRecordProcessor(); } @@ -147,7 +148,6 @@ public class TezProcessor extends Abstra throws Exception { Throwable originalThrowable = null; try { - // Outputs will be started later by the individual Processors. TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); // Start the actual Inputs. After MRInput initialization. for (Map.Entry inputEntry : inputs.entrySet()) { @@ -155,10 +155,13 @@ public class TezProcessor extends Abstra LOG.info("Input: " + inputEntry.getKey() + " is not cached"); inputEntry.getValue().start(); } else { - LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); + LOG.info("Input: " + inputEntry.getKey() + + " is already cached. Skipping start"); } } + // Outputs will be started later by the individual Processors. + MRTaskReporter mrReporter = new MRTaskReporter(getContext()); rproc.init(jobConf, getContext(), mrReporter, inputs, outputs); rproc.run(); @@ -211,4 +214,19 @@ public class TezProcessor extends Abstra writer.write(key, value); } } + + static MRInputLegacy getMRInput(Map inputs) { + //there should be only one MRInput + MRInputLegacy theMRInput = null; + for(LogicalInput inp : inputs.values()){ + if(inp instanceof MRInputLegacy){ + if(theMRInput != null){ + throw new IllegalArgumentException("Only one MRInput is expected"); + } + //a better logic would be to find the alias + theMRInput = (MRInputLegacy)inp; + } + } + return theMRInput; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Mon Oct 6 03:44:13 2014 @@ -168,10 +168,10 @@ public class TezSessionPoolManager { // session in the SessionState } - public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { + public void close(TezSessionState tezSessionState) throws Exception { LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { - tezSessionState.close(keepTmpDir); + tezSessionState.close(false); } } @@ -262,24 +262,19 @@ public class TezSessionPoolManager { } if (session != null) { - close(session, false); + close(session); } return getSession(conf, doOpen, forceCreate); } - public void closeAndOpen(TezSessionState sessionState, HiveConf conf, boolean keepTmpDir) + public void closeAndOpen(TezSessionState sessionState, HiveConf conf) throws Exception { - closeAndOpen(sessionState, conf, null, keepTmpDir); - } - - public void closeAndOpen(TezSessionState sessionState, HiveConf conf, - String[] additionalFiles, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } - close(sessionState, keepTmpDir); - sessionState.open(conf, additionalFiles); + close(sessionState); + sessionState.open(conf); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Oct 6 03:44:13 2014 @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; @@ -56,7 +55,6 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -126,11 +124,14 @@ public class TezTask extends Task inputOutputLocalResources = - getExtraLocalResources(jobConf, scratchDir, inputOutputJars); - - // Ensure the session is open and has the necessary local resources - updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources); + if (!session.isOpen()) { + // can happen if the user sets the tez flag after the session was + // established + LOG.info("Tez session hasn't been created yet. Opening session"); + session.open(conf, inputOutputJars); + } else { + session.refreshLocalResourcesFromConf(conf); + } List additionalLr = session.getLocalizedResources(); @@ -152,12 +153,8 @@ public class TezTask extends Task getExtraLocalResources(JobConf jobConf, Path scratchDir, - String[] inputOutputJars) throws Exception { - final Map resources = new HashMap(); - final List localResources = utils.localizeTempFiles( - scratchDir.toString(), jobConf, inputOutputJars); - if (null != localResources) { - for (LocalResource lr : localResources) { - resources.put(utils.getBaseName(lr), lr); - } - } - return resources; - } - - /** - * Ensures that the Tez Session is open and the AM has all necessary jars configured. - */ - void updateSession(TezSessionState session, - JobConf jobConf, Path scratchDir, String[] inputOutputJars, - Map extraResources) throws Exception { - final boolean missingLocalResources = !session - .hasResources(inputOutputJars); - - if (!session.isOpen()) { - // can happen if the user sets the tez flag after the session was - // established - LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(conf, inputOutputJars); - } else { - LOG.info("Session is already open"); - - // Ensure the open session has the necessary resources (StorageHandler) - if (missingLocalResources) { - LOG.info("Tez session missing resources," + - " adding additional necessary resources"); - session.getSession().addAppMasterLocalFiles(extraResources); - } - - session.refreshLocalResourcesFromConf(conf); - } - } - - /** - * Adds any necessary resources that must be localized in each vertex to the DAG. - */ - void addExtraResourcesToDag(TezSessionState session, DAG dag, - String[] inputOutputJars, - Map inputOutputLocalResources) throws Exception { - if (!session.hasResources(inputOutputJars)) { - if (null != inputOutputLocalResources) { - dag.addTaskLocalFiles(inputOutputLocalResources); - } - } - } - DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, List additionalLr, Context ctx) throws Exception { @@ -314,16 +254,15 @@ public class TezTask extends Task additionalLr, String[] inputOutputJars, - Map inputOutputLocalResources) + List additionalLr) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); @@ -370,7 +308,7 @@ public class TezTask extends Task ws = work.getAllWork(); for (BaseWork w: ws) { - if (w instanceof MergeJoinWork) { - w = ((MergeJoinWork) w).getMainWork(); - } for (Operator op: w.getAllOperators()) { op.jobClose(conf, rc == 0); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Mon Oct 6 03:44:13 2014 @@ -40,7 +40,7 @@ public class TezMergedLogicalInput exten @Override public Reader getReader() throws Exception { - return new KeyValuesInputMerger(getInputs()); + return new InputMerger(getInputs()); } @Override Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java Mon Oct 6 03:44:13 2014 @@ -67,7 +67,6 @@ public class VectorExpressionDescriptor DATE (0x040), TIMESTAMP (0x080), DATETIME_FAMILY (DATE.value | TIMESTAMP.value), - INT_TIMESTAMP_FAMILY (INT_FAMILY.value | TIMESTAMP.value), INT_DATETIME_FAMILY (INT_FAMILY.value | DATETIME_FAMILY.value), STRING_DATETIME_FAMILY (STRING_FAMILY.value | DATETIME_FAMILY.value), ALL_FAMILY (0xFFF); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java Mon Oct 6 03:44:13 2014 @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -46,8 +45,7 @@ public class VectorExtractOperator exten private int keyColCount; private int valueColCount; - private transient VectorizedRowBatch outputBatch; - private transient int remainingColCount; + private transient int [] projectedColumns = null; public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { @@ -59,25 +57,26 @@ public class VectorExtractOperator exten super(); } - @Override - protected void initializeOp(Configuration hconf) throws HiveException { - StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0]; - List fields = structInputObjInspector.getAllStructFieldRefs(); + private StructObjectInspector makeStandardStructObjectInspector(StructObjectInspector structObjectInspector) { + List fields = structObjectInspector.getAllStructFieldRefs(); ArrayList ois = new ArrayList(); ArrayList colNames = new ArrayList(); - for (int i = keyColCount; i < fields.size(); i++) { - StructField field = fields.get(i); - String fieldName = field.getFieldName(); - - // Remove "VALUE." prefix. - int dotIndex = fieldName.indexOf("."); - colNames.add(fieldName.substring(dotIndex + 1)); + for (StructField field: fields) { + colNames.add(field.getFieldName()); ois.add(field.getFieldObjectInspector()); } - outputObjInspector = ObjectInspectorFactory + return ObjectInspectorFactory .getStandardStructObjectInspector(colNames, ois); - remainingColCount = fields.size() - keyColCount; - outputBatch = new VectorizedRowBatch(remainingColCount); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + outputObjInspector = inputObjInspectors[0]; + LOG.info("VectorExtractOperator class of outputObjInspector is " + outputObjInspector.getClass().getName()); + projectedColumns = new int [valueColCount]; + for (int i = 0; i < valueColCount; i++) { + projectedColumns[i] = keyColCount + i; + } initializeChildren(hconf); } @@ -87,16 +86,20 @@ public class VectorExtractOperator exten } @Override - // Remove the key columns and forward the values (and scratch columns). + // Evaluate vectorized batches of rows and forward them. public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch inputBatch = (VectorizedRowBatch) row; - - // Copy references to the input columns array starting after the keys... - for (int i = 0; i < remainingColCount; i++) { - outputBatch.cols[i] = inputBatch.cols[keyColCount + i]; - } - outputBatch.size = inputBatch.size; + VectorizedRowBatch vrg = (VectorizedRowBatch) row; - forward(outputBatch, outputObjInspector); + // Project away the key columns... + int[] originalProjections = vrg.projectedColumns; + int originalProjectionSize = vrg.projectionSize; + vrg.projectionSize = valueColCount; + vrg.projectedColumns = this.projectedColumns; + + forward(vrg, outputObjInspector); + + // Revert the projected columns back, because vrg will be re-used. + vrg.projectionSize = originalProjectionSize; + vrg.projectedColumns = originalProjections; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Mon Oct 6 03:44:13 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -25,7 +27,16 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; /** * File Sink operator implementation. @@ -58,10 +69,113 @@ public class VectorFileSinkOperator exte @Override public void processOp(Object data, int tag) throws HiveException { + VectorizedRowBatch vrg = (VectorizedRowBatch)data; + + Writable [] records = null; + boolean vectorizedSerde = false; + try { + if (serializer instanceof VectorizedSerde) { + recordValue = ((VectorizedSerde) serializer).serializeVector(vrg, + inputObjInspectors[0]); + records = (Writable[]) ((ObjectWritable) recordValue).get(); + vectorizedSerde = true; + } + } catch (SerDeException e1) { + throw new HiveException(e1); + } + for (int i = 0; i < vrg.size; i++) { - Object[] row = getRowObject(vrg, i); - super.processOp(row, tag); + Writable row = null; + if (vectorizedSerde) { + row = records[i]; + } else { + if (vrg.valueWriters == null) { + vrg.setValueWriters(this.valueWriters); + } + try { + row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]); + } catch (SerDeException ex) { + throw new HiveException(ex); + } + } + /* Create list bucketing sub-directory only if stored-as-directories is on. */ + String lbDirName = null; + lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); + + FSPaths fpaths; + + if (!bDynParts && !filesCreated) { + if (lbDirName != null) { + FSPaths fsp2 = lookupListBucketingPaths(lbDirName); + } else { + createBucketFiles(fsp); + } + } + + try { + updateProgress(); + + // if DP is enabled, get the final output writers and prepare the real output row + assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct"; + + if (bDynParts) { + // copy the DP column values from the input row to dpVals + dpVals.clear(); + dpWritables.clear(); + ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts, + (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + // get a set of RecordWriter based on the DP column values + // pass the null value along to the escaping process to determine what the dir should be + for (Object o : dpWritables) { + if (o == null || o.toString().length() == 0) { + dpVals.add(dpCtx.getDefaultPartitionName()); + } else { + dpVals.add(o.toString()); + } + } + fpaths = getDynOutPaths(dpVals, lbDirName); + + } else { + if (lbDirName != null) { + fpaths = lookupListBucketingPaths(lbDirName); + } else { + fpaths = fsp; + } + } + + rowOutWriters = fpaths.getOutWriters(); + // check if all record writers implement statistics. if atleast one RW + // doesn't implement stats interface we will fallback to conventional way + // of gathering stats + isCollectRWStats = areAllTrue(statsFromRecordWriter); + if (conf.isGatherStats() && !isCollectRWStats) { + if (statsCollectRawDataSize) { + SerDeStats stats = serializer.getSerDeStats(); + if (stats != null) { + fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + } + } + fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1); + } + + + if (row_count != null) { + row_count.set(row_count.get() + 1); + } + + if (!multiFileSpray) { + rowOutWriters[0].write(row); + } else { + int keyHashCode = 0; + key.setHashCode(keyHashCode); + int bucketNum = prtner.getBucket(key, null, totalFiles); + int idx = bucketMap.get(bucketNum); + rowOutWriters[idx].write(row); + } + } catch (IOException e) { + throw new HiveException(e); + } } } @@ -73,7 +187,7 @@ public class VectorFileSinkOperator exte } for (int i = 0; i < vrg.projectionSize; i++) { ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; - singleRow[i] = valueWriters[i].writeValue(vectorColumn, batchIndex); + singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, batchIndex); } return singleRow; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Mon Oct 6 03:44:13 2014 @@ -1889,47 +1889,47 @@ public class VectorizationContext { // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively // marked map-side (HASH). static ArrayList aggregatesDefinition = new ArrayList() {{ - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMinLong.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); - add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMaxLong.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); - add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); - add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); - add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); - add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); - add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); - add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); - add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); - add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class)); + add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class)); + add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class)); + add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class)); + add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class)); + add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class)); + add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class)); + add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class)); + add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class)); + add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class)); }}; public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce) Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Mon Oct 6 03:44:13 2014 @@ -140,20 +140,6 @@ public class VectorizedRowBatchCtx { /** - * Initializes the VectorizedRowBatch context based on an scratch column type map and - * object inspector. - * @param columnTypeMap - * @param rowOI - * Object inspector that shapes the column types - */ - public void init(Map columnTypeMap, - StructObjectInspector rowOI) { - this.columnTypeMap = columnTypeMap; - this.rowOI= rowOI; - this.rawRowOI = rowOI; - } - - /** * Initializes VectorizedRowBatch context based on the * split and Hive configuration (Job conf with hive Plan). * Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Oct 6 03:44:13 2014 @@ -40,8 +40,6 @@ import java.util.regex.Pattern; * are used by the compactor and cleaner and thus must be format agnostic. */ public class AcidUtils { - // This key will be put in the conf file when planning an acid operation - public static final String CONF_ACID_KEY = "hive.doing.acid"; public static final String BASE_PREFIX = "base_"; public static final String DELTA_PREFIX = "delta_"; public static final PathFilter deltaFileFilter = new PathFilter() { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Mon Oct 6 03:44:13 2014 @@ -161,11 +161,10 @@ public abstract class HiveContextAwareRe } public IOContext getIOContext() { - return IOContext.get(jobConf.get(Utilities.INPUT_NAME)); + return IOContext.get(); } - private void initIOContext(long startPos, boolean isBlockPointer, - Path inputPath) { + public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) { ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; ioCxtRef.isBlockPointer = isBlockPointer; @@ -184,7 +183,7 @@ public abstract class HiveContextAwareRe boolean blockPointer = false; long blockStart = -1; - FileSplit fileSplit = split; + FileSplit fileSplit = (FileSplit) split; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(job); if (inputFormatClass.getName().contains("SequenceFile")) { @@ -203,15 +202,12 @@ public abstract class HiveContextAwareRe blockStart = in.getPosition(); in.close(); } - this.jobConf = job; this.initIOContext(blockStart, blockPointer, path.makeQualified(fs)); this.initIOContextSortedProps(split, recordReader, job); } public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) { - this.jobConf = job; - this.getIOContext().resetSortingValues(); this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Mon Oct 6 03:44:13 2014 @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -254,14 +253,7 @@ public class HiveInputFormat threadLocal = new ThreadLocal(){ @Override protected synchronized IOContext initialValue() { return new IOContext(); } @@ -43,21 +41,6 @@ public class IOContext { return IOContext.threadLocal.get(); } - private static Map inputNameIOContextMap = new HashMap(); - - public static Map getMap() { - return inputNameIOContextMap; - } - - public static IOContext get(String inputName) { - if (inputNameIOContextMap.containsKey(inputName) == false) { - IOContext ioContext = new IOContext(); - inputNameIOContextMap.put(inputName, ioContext); - } - - return inputNameIOContextMap.get(inputName); - } - public static void clear() { IOContext.threadLocal.remove(); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Oct 6 03:44:13 2014 @@ -132,7 +132,7 @@ public class OrcInputFormat implements @Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException { - return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf); + return AcidUtils.isAcid(path, conf); } private static class OrcRecordReader Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Mon Oct 6 03:44:13 2014 @@ -118,11 +118,13 @@ public class OrcNewInputFormat extends I public List getSplits(JobContext jobContext) throws IOException, InterruptedException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); + Configuration conf = + ShimLoader.getHadoopShims().getConfiguration(jobContext); List splits = OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims() .getConfiguration(jobContext)); - List result = new ArrayList(splits.size()); - for(OrcSplit split: splits) { + List result = new ArrayList(); + for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) { result.add(new OrcNewSplit(split)); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Mon Oct 6 03:44:13 2014 @@ -418,120 +418,138 @@ class RunLengthIntegerWriterV2 implement private void determineEncoding() { - // we need to compute zigzag values for DIRECT encoding if we decide to - // break early for delta overflows or for shorter runs - computeZigZagLiterals(); + int idx = 0; - zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); + // for identifying monotonic sequences + boolean isIncreasing = false; + int increasingCount = 1; + boolean isDecreasing = false; + int decreasingCount = 1; - // not a big win for shorter runs to determine encoding - if (numLiterals <= MIN_REPEAT) { - encoding = EncodingType.DIRECT; - return; - } + // for identifying type of delta encoding + min = literals[0]; + long max = literals[0]; + isFixedDelta = true; + long currDelta = 0; - // DELTA encoding check + min = literals[0]; + long deltaMax = 0; - // for identifying monotonic sequences - boolean isIncreasing = true; - boolean isDecreasing = true; - this.isFixedDelta = true; + // populate all variables to identify the encoding type + if (numLiterals >= 1) { + currDelta = literals[1] - literals[0]; + for(int i = 0; i < numLiterals; i++) { + if (i > 0 && literals[i] >= max) { + max = literals[i]; + increasingCount++; + } - this.min = literals[0]; - long max = literals[0]; - final long initialDelta = literals[1] - literals[0]; - long currDelta = initialDelta; - long deltaMax = initialDelta; - this.adjDeltas[0] = initialDelta; - - for (int i = 1; i < numLiterals; i++) { - final long l1 = literals[i]; - final long l0 = literals[i - 1]; - currDelta = l1 - l0; - min = Math.min(min, l1); - max = Math.max(max, l1); - - isIncreasing &= (l0 <= l1); - isDecreasing &= (l0 >= l1); - - isFixedDelta &= (currDelta == initialDelta); - if (i > 1) { - adjDeltas[i - 1] = Math.abs(currDelta); - deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); - } - } + if (i > 0 && literals[i] <= min) { + min = literals[i]; + decreasingCount++; + } - // its faster to exit under delta overflow condition without checking for - // PATCHED_BASE condition as encoding using DIRECT is faster and has less - // overhead than PATCHED_BASE - if (!utils.isSafeSubtract(max, min)) { - encoding = EncodingType.DIRECT; - return; - } + // if delta doesn't changes then mark it as fixed delta + if (i > 0 && isFixedDelta) { + if (literals[i] - literals[i - 1] != currDelta) { + isFixedDelta = false; + } - // invariant - subtracting any number from any other in the literals after - // this point won't overflow + fixedDelta = currDelta; + } - // if initialDelta is 0 then we cannot delta encode as we cannot identify - // the sign of deltas (increasing or decreasing) - if (initialDelta != 0) { - - // if min is equal to max then the delta is 0, this condition happens for - // fixed values run >10 which cannot be encoded with SHORT_REPEAT - if (min == max) { - assert isFixedDelta : min + "==" + max + - ", isFixedDelta cannot be false"; - assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; - fixedDelta = 0; - encoding = EncodingType.DELTA; - return; - } + // populate zigzag encoded literals + long zzEncVal = 0; + if (signed) { + zzEncVal = utils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[idx] = zzEncVal; + idx++; - if (isFixedDelta) { - assert currDelta == initialDelta - : "currDelta should be equal to initialDelta for fixed delta encoding"; - encoding = EncodingType.DELTA; - fixedDelta = currDelta; - return; + // max delta value is required for computing the fixed bits + // required for delta blob in delta encoding + if (i > 0) { + if (i == 1) { + // first value preserve the sign + adjDeltas[i - 1] = literals[i] - literals[i - 1]; + } else { + adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]); + if (adjDeltas[i - 1] > deltaMax) { + deltaMax = adjDeltas[i - 1]; + } + } + } } // stores the number of bits required for packing delta blob in // delta encoding bitsDeltaMax = utils.findClosestNumBits(deltaMax); - // monotonic condition - if (isIncreasing || isDecreasing) { - encoding = EncodingType.DELTA; - return; + // if decreasing count equals total number of literals then the + // sequence is monotonically decreasing + if (increasingCount == 1 && decreasingCount == numLiterals) { + isDecreasing = true; + } + + // if increasing count equals total number of literals then the + // sequence is monotonically increasing + if (decreasingCount == 1 && increasingCount == numLiterals) { + isIncreasing = true; } } - // PATCHED_BASE encoding check + // if the sequence is both increasing and decreasing then it is not + // monotonic + if (isDecreasing && isIncreasing) { + isDecreasing = false; + isIncreasing = false; + } + + // fixed delta condition + if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) { + encoding = EncodingType.DELTA; + return; + } + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } // percentile values are computed for the zigzag encoded values. if the // number of bit requirement between 90th and 100th percentile varies // beyond a threshold then we need to patch the values. if the variation - // is not significant then we can use direct encoding + // is not significant then we can use direct or delta encoding + + double p = 0.9; + zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p); + + p = 1.0; + zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p); - zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); int diffBitsLH = zzBits100p - zzBits90p; // if the difference between 90th percentile and 100th percentile fixed // bits is > 1 then we need patch the values - if (diffBitsLH > 1) { - + if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1 + && isFixedDelta == false) { // patching is done only on base reduced values. // remove base from literals - for (int i = 0; i < numLiterals; i++) { + for(int i = 0; i < numLiterals; i++) { baseRedLiterals[i] = literals[i] - min; } // 95th percentile width is used to determine max allowed value // after which patching will be done - brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); + p = 0.95; + brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p); // 100th percentile is used to compute the max patch width - brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); + p = 1.0; + brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p); // after base reducing the values, if the difference in bits between // 95th percentile and 100th percentile value is zero then there @@ -547,24 +565,19 @@ class RunLengthIntegerWriterV2 implement encoding = EncodingType.DIRECT; return; } - } else { - // if difference in bits between 95th percentile and 100th percentile is - // 0, then patch length will become 0. Hence we will fallback to direct + } + + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1 + && isFixedDelta == false) { encoding = EncodingType.DIRECT; return; } - } - private void computeZigZagLiterals() { - // populate zigzag encoded literals - long zzEncVal = 0; - for (int i = 0; i < numLiterals; i++) { - if (signed) { - zzEncVal = utils.zigzagEncode(literals[i]); - } else { - zzEncVal = literals[i]; - } - zigzagLiterals[i] = zzEncVal; + // this should not happen + if (encoding == null) { + throw new RuntimeException("Integer encoding cannot be determined."); } } @@ -687,7 +700,7 @@ class RunLengthIntegerWriterV2 implement patchWidth = 0; gapVsPatchList = null; min = 0; - isFixedDelta = true; + isFixedDelta = false; } @Override Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Mon Oct 6 03:44:13 2014 @@ -1283,9 +1283,4 @@ final class SerializationUtils { + ((readBuffer[rbOffset + 7] & 255) << 0)); } - // Do not want to use Guava LongMath.checkedSubtract() here as it will throw - // ArithmeticException in case of overflow - public boolean isSafeSubtract(long left, long right) { - return (left ^ right) >= 0 | (left ^ (left - right)) >= 0; - } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java Mon Oct 6 03:44:13 2014 @@ -13,6 +13,9 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; +import java.util.List; + +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; @@ -30,7 +33,7 @@ public class ArrayWritableGroupConverter private Writable[] mapPairContainer; public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, - final int index) { + final int index, List hiveSchemaTypeInfos) { this.parent = parent; this.index = index; int count = groupType.getFieldCount(); @@ -40,7 +43,8 @@ public class ArrayWritableGroupConverter isMap = count == 2; converters = new Converter[count]; for (int i = 0; i < count; i++) { - converters[i] = getConverterFromDescription(groupType.getType(i), i, this); + converters[i] = getConverterFromDescription(groupType.getType(i), i, this, + hiveSchemaTypeInfos); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java Mon Oct 6 03:44:13 2014 @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.ql.io.par import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; @@ -36,19 +37,21 @@ public class DataWritableGroupConverter private final Object[] currentArr; private Writable[] rootMap; - public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) { - this(requestedSchema, null, 0, tableSchema); + public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema, + final List hiveSchemaTypeInfos) { + this(requestedSchema, null, 0, tableSchema, hiveSchemaTypeInfos); final int fieldCount = tableSchema.getFieldCount(); this.rootMap = new Writable[fieldCount]; } public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, - final int index) { - this(groupType, parent, index, groupType); + final int index, final List hiveSchemaTypeInfos) { + this(groupType, parent, index, groupType, hiveSchemaTypeInfos); } public DataWritableGroupConverter(final GroupType selectedGroupType, - final HiveGroupConverter parent, final int index, final GroupType containingGroupType) { + final HiveGroupConverter parent, final int index, final GroupType containingGroupType, + final List hiveSchemaTypeInfos) { this.parent = parent; this.index = index; final int totalFieldCount = containingGroupType.getFieldCount(); @@ -62,7 +65,8 @@ public class DataWritableGroupConverter Type subtype = selectedFields.get(i); if (containingGroupType.getFields().contains(subtype)) { converters[i] = getConverterFromDescription(subtype, - containingGroupType.getFieldIndex(subtype.getName()), this); + containingGroupType.getFieldIndex(subtype.getName()), this, + hiveSchemaTypeInfos); } else { throw new IllegalStateException("Group type [" + containingGroupType + "] does not contain requested field: " + subtype); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java Mon Oct 6 03:44:13 2014 @@ -31,8 +31,10 @@ public class DataWritableRecordConverter private final DataWritableGroupConverter root; - public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) { - this.root = new DataWritableGroupConverter(requestedSchema, tableSchema); + public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema, + final List hiveColumnTypeInfos) { + this.root = new DataWritableGroupConverter(requestedSchema, tableSchema, + hiveColumnTypeInfos); } @Override