Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DD08A17886 for ; Tue, 17 Feb 2015 06:49:51 +0000 (UTC) Received: (qmail 41181 invoked by uid 500); 17 Feb 2015 06:49:49 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 41079 invoked by uid 500); 17 Feb 2015 06:49:49 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 39348 invoked by uid 99); 17 Feb 2015 06:49:48 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Feb 2015 06:49:48 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 41D54AC0590 for ; Tue, 17 Feb 2015 06:49:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1660293 [14/48] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/ accumul... Date: Tue, 17 Feb 2015 06:49:34 -0000 To: commits@hive.apache.org From: xuefu@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150217064948.41D54AC0590@hades.apache.org> Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Feb 17 06:49:27 2015 @@ -20,10 +20,17 @@ package org.apache.hadoop.hive.ql.io.orc import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.filters.BloomFilter; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier; import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy; import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; @@ -70,16 +78,12 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.management.ManagementFactory; -import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; /** * An ORC file writer. The file is divided into stripes, which is the natural @@ -145,23 +149,27 @@ class WriterImpl implements Writer, Memo private final OrcFile.WriterContext callbackContext; private final OrcFile.EncodingStrategy encodingStrategy; private final OrcFile.CompressionStrategy compressionStrategy; + private final boolean[] bloomFilterColumns; + private final double bloomFilterFpp; WriterImpl(FileSystem fs, - Path path, - Configuration conf, - ObjectInspector inspector, - long stripeSize, - CompressionKind compress, - int bufferSize, - int rowIndexStride, - MemoryManager memoryManager, - boolean addBlockPadding, - OrcFile.Version version, - OrcFile.WriterCallback callback, - OrcFile.EncodingStrategy encodingStrategy, - CompressionStrategy compressionStrategy, - float paddingTolerance, - long blockSizeValue) throws IOException { + Path path, + Configuration conf, + ObjectInspector inspector, + long stripeSize, + CompressionKind compress, + int bufferSize, + int rowIndexStride, + MemoryManager memoryManager, + boolean addBlockPadding, + OrcFile.Version version, + OrcFile.WriterCallback callback, + EncodingStrategy encodingStrategy, + CompressionStrategy compressionStrategy, + float paddingTolerance, + long blockSizeValue, + String bloomFilterColumnNames, + double bloomFilterFpp) throws IOException { this.fs = fs; this.path = path; this.conf = conf; @@ -190,7 +198,20 @@ class WriterImpl implements Writer, Memo this.memoryManager = memoryManager; buildIndex = rowIndexStride > 0; codec = createCodec(compress); - this.bufferSize = getEstimatedBufferSize(bufferSize); + String allColumns = conf.get(IOConstants.COLUMNS); + if (allColumns == null) { + allColumns = getColumnNamesFromInspector(inspector); + } + this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); + if (version == OrcFile.Version.V_0_11) { + /* do not write bloom filters for ORC v11 */ + this.bloomFilterColumns = + OrcUtils.includeColumns(null, allColumns, inspector); + } else { + this.bloomFilterColumns = + OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + } + this.bloomFilterFpp = bloomFilterFpp; treeWriter = createTreeWriter(inspector, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + @@ -201,8 +222,25 @@ class WriterImpl implements Writer, Memo memoryManager.addWriter(path, stripeSize, this); } + private String getColumnNamesFromInspector(ObjectInspector inspector) { + List fieldNames = Lists.newArrayList(); + Joiner joiner = Joiner.on(","); + if (inspector instanceof StructObjectInspector) { + StructObjectInspector soi = (StructObjectInspector) inspector; + List fields = soi.getAllStructFieldRefs(); + for(StructField sf : fields) { + fieldNames.add(sf.getFieldName()); + } + } + return joiner.join(fieldNames); + } + + @VisibleForTesting int getEstimatedBufferSize(int bs) { - String colNames = conf.get(IOConstants.COLUMNS); + return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs); + } + + int getEstimatedBufferSize(String colNames, int bs) { long availableMem = getMemoryAvailableForORC(); if (colNames != null) { final int numCols = colNames.split(",").length; @@ -459,26 +497,27 @@ class WriterImpl implements Writer, Memo final EnumSet modifiers; switch (kind) { - case DATA: - case DICTIONARY_DATA: - if (getCompressionStrategy() == CompressionStrategy.SPEED) { - modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT); - } else { - modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT); - } - break; - case LENGTH: - case DICTIONARY_COUNT: - case PRESENT: - case ROW_INDEX: - case SECONDARY: - // easily compressed using the fastest modes - modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY); - break; - default: - LOG.warn("Missing ORC compression modifiers for " + kind); - modifiers = null; - break; + case BLOOM_FILTER: + case DATA: + case DICTIONARY_DATA: + if (getCompressionStrategy() == CompressionStrategy.SPEED) { + modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT); + } else { + modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT); + } + break; + case LENGTH: + case DICTIONARY_COUNT: + case PRESENT: + case ROW_INDEX: + case SECONDARY: + // easily compressed using the fastest modes + modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY); + break; + default: + LOG.warn("Missing ORC compression modifiers for " + kind); + modifiers = null; + break; } BufferedStream result = streams.get(name); @@ -499,6 +538,15 @@ class WriterImpl implements Writer, Memo } /** + * Get the current column id. After creating all tree writers this count should tell how many + * columns (including columns within nested complex objects) are created in total. + * @return current column id + */ + public int getCurrentColumnId() { + return columnCount; + } + + /** * Get the stride rate of the row index. */ public int getRowIndexStride() { @@ -538,6 +586,22 @@ class WriterImpl implements Writer, Memo } /** + * Get the bloom filter columns + * @return bloom filter columns + */ + public boolean[] getBloomFilterColumns() { + return bloomFilterColumns; + } + + /** + * Get bloom filter false positive percentage. + * @return fpp + */ + public double getBloomFilterFPP() { + return bloomFilterFpp; + } + + /** * Get the writer's configuration. * @return configuration */ @@ -572,6 +636,11 @@ class WriterImpl implements Writer, Memo private final OrcProto.RowIndex.Builder rowIndex; private final OrcProto.RowIndexEntry.Builder rowIndexEntry; private final PositionedOutputStream rowIndexStream; + private final PositionedOutputStream bloomFilterStream; + protected final BloomFilter bloomFilter; + protected final boolean createBloomFilter; + private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex; + private final OrcProto.BloomFilter.Builder bloomFilterEntry; private boolean foundNulls; private OutStream isPresentOutStream; private final List stripeStatsBuilders; @@ -598,6 +667,7 @@ class WriterImpl implements Writer, Memo isPresent = null; } this.foundNulls = false; + createBloomFilter = streamFactory.getBloomFilterColumns()[columnId]; indexStatistics = ColumnStatisticsImpl.create(inspector); stripeColStatistics = ColumnStatisticsImpl.create(inspector); fileStatistics = ColumnStatisticsImpl.create(inspector); @@ -607,11 +677,22 @@ class WriterImpl implements Writer, Memo rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry); stripeStatsBuilders = Lists.newArrayList(); if (streamFactory.buildIndex()) { - rowIndexStream = streamFactory.createStream(id, - OrcProto.Stream.Kind.ROW_INDEX); + rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX); } else { rowIndexStream = null; } + if (createBloomFilter) { + bloomFilterEntry = OrcProto.BloomFilter.newBuilder(); + bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder(); + bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER); + bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(), + streamFactory.getBloomFilterFPP()); + } else { + bloomFilterEntry = null; + bloomFilterIndex = null; + bloomFilterStream = null; + bloomFilter = null; + } } protected OrcProto.RowIndex.Builder getRowIndex() { @@ -630,7 +711,7 @@ class WriterImpl implements Writer, Memo return rowIndexEntry; } - IntegerWriter createIntegerWriter(PositionedOutputStream output, + IntegerWriter createIntegerWriter(OutStream output, boolean signed, boolean isDirectV2, StreamFactory writer) { if (isDirectV2) { @@ -725,6 +806,14 @@ class WriterImpl implements Writer, Memo } rowIndex.clear(); rowIndexEntry.clear(); + + // write the bloom filter to out stream + if (bloomFilterStream != null) { + bloomFilterIndex.build().writeTo(bloomFilterStream); + bloomFilterStream.flush(); + bloomFilterIndex.clear(); + bloomFilterEntry.clear(); + } } private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder, @@ -763,12 +852,23 @@ class WriterImpl implements Writer, Memo indexStatistics.reset(); rowIndex.addEntry(rowIndexEntry); rowIndexEntry.clear(); + addBloomFilterEntry(); recordPosition(rowIndexPosition); for(TreeWriter child: childrenWriters) { child.createRowIndexEntry(); } } + void addBloomFilterEntry() { + if (createBloomFilter) { + bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions()); + bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet())); + bloomFilterIndex.addBloomFilter(bloomFilterEntry.build()); + bloomFilter.reset(); + bloomFilterEntry.clear(); + } + } + /** * Record the current position in each of this column's streams. * @param recorder where should the locations be recorded @@ -851,6 +951,9 @@ class WriterImpl implements Writer, Memo if (obj != null) { byte val = ((ByteObjectInspector) inspector).get(obj); indexStatistics.updateInteger(val); + if (createBloomFilter) { + bloomFilter.addLong(val); + } writer.write(val); } } @@ -882,7 +985,7 @@ class WriterImpl implements Writer, Memo StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); - PositionedOutputStream out = writer.createStream(id, + OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); @@ -926,6 +1029,10 @@ class WriterImpl implements Writer, Memo val = shortInspector.get(obj); } indexStatistics.updateInteger(val); + if (createBloomFilter) { + // integers are converted to longs in column statistics and during SARG evaluation + bloomFilter.addLong(val); + } writer.write(val); } } @@ -966,6 +1073,10 @@ class WriterImpl implements Writer, Memo if (obj != null) { float val = ((FloatObjectInspector) inspector).get(obj); indexStatistics.updateDouble(val); + if (createBloomFilter) { + // floats are converted to doubles in column statistics and during SARG evaluation + bloomFilter.addDouble(val); + } utils.writeFloat(stream, val); } } @@ -1006,6 +1117,9 @@ class WriterImpl implements Writer, Memo if (obj != null) { double val = ((DoubleObjectInspector) inspector).get(obj); indexStatistics.updateDouble(val); + if (createBloomFilter) { + bloomFilter.addDouble(val); + } utils.writeDouble(stream, val); } } @@ -1099,6 +1213,9 @@ class WriterImpl implements Writer, Memo directLengthOutput.write(val.getLength()); } indexStatistics.updateString(val); + if (createBloomFilter) { + bloomFilter.addBytes(val.getBytes(), val.getLength()); + } } } @@ -1162,6 +1279,14 @@ class WriterImpl implements Writer, Memo // Write the dictionary by traversing the red-black tree writing out // the bytes and lengths; and creating the map from the original order // to the final sorted order. + if (dictionary.size() == 0) { + if (LOG.isWarnEnabled()) { + LOG.warn("Empty dictionary. Suppressing dictionary stream."); + } + stringOutput.suppress(); + lengthOutput.suppress(); + } + dictionary.visit(new StringRedBlackTree.Visitor() { private int currentId = 0; @Override @@ -1250,6 +1375,7 @@ class WriterImpl implements Writer, Memo OrcProto.RowIndexEntry base = rowIndexEntry.build(); savedRowIndex.add(base); rowIndexEntry.clear(); + addBloomFilterEntry(); recordPosition(rowIndexPosition); rowIndexValueCount.add(Long.valueOf(rows.size())); if (strideDictionaryCheck) { @@ -1360,6 +1486,9 @@ class WriterImpl implements Writer, Memo stream.write(val.getBytes(), 0, val.getLength()); length.write(val.getLength()); indexStatistics.updateBinary(val); + if (createBloomFilter) { + bloomFilter.addBytes(val.getBytes(), val.getLength()); + } } } @@ -1422,6 +1551,9 @@ class WriterImpl implements Writer, Memo indexStatistics.updateTimestamp(val); seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP); nanos.write(formatNanos(val.getNanos())); + if (createBloomFilter) { + bloomFilter.addLong(val.getTime()); + } } } @@ -1467,7 +1599,7 @@ class WriterImpl implements Writer, Memo StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, writer, nullable); - PositionedOutputStream out = writer.createStream(id, + OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); @@ -1482,6 +1614,9 @@ class WriterImpl implements Writer, Memo DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj); indexStatistics.updateDate(val); writer.write(val.getDays()); + if (createBloomFilter) { + bloomFilter.addLong(val.getDays()); + } } } @@ -1550,6 +1685,9 @@ class WriterImpl implements Writer, Memo decimal.unscaledValue()); scaleStream.write(decimal.scale()); indexStatistics.updateDecimal(decimal); + if (createBloomFilter) { + bloomFilter.addString(decimal.toString()); + } } } @@ -1649,6 +1787,9 @@ class WriterImpl implements Writer, Memo ListObjectInspector insp = (ListObjectInspector) inspector; int len = insp.getListLength(obj); lengths.write(len); + if (createBloomFilter) { + bloomFilter.addLong(len); + } for(int i=0; i < len; ++i) { childrenWriters[0].write(insp.getListElement(obj, i)); } @@ -1713,6 +1854,9 @@ class WriterImpl implements Writer, Memo // accessor in the MapObjectInspector. Map valueMap = insp.getMap(obj); lengths.write(valueMap.size()); + if (createBloomFilter) { + bloomFilter.addLong(valueMap.size()); + } for(Map.Entry entry: valueMap.entrySet()) { childrenWriters[0].write(entry.getKey()); childrenWriters[1].write(entry.getValue()); @@ -1765,6 +1909,9 @@ class WriterImpl implements Writer, Memo UnionObjectInspector insp = (UnionObjectInspector) inspector; byte tag = insp.getTag(obj); tags.write(tag); + if (createBloomFilter) { + bloomFilter.addLong(tag); + } childrenWriters[tag].write(insp.getField(obj)); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Tue Feb 17 06:49:27 2015 @@ -21,7 +21,6 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; @@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.util.Progressable; import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.hadoop.util.ContextUtil; /** * * A Parquet OutputFormat for Hive (with the deprecated package mapred) * */ -public class MapredParquetOutputFormat extends FileOutputFormat implements - HiveOutputFormat { +public class MapredParquetOutputFormat extends FileOutputFormat implements + HiveOutputFormat { private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class); - protected ParquetOutputFormat realOutputFormat; + protected ParquetOutputFormat realOutputFormat; public MapredParquetOutputFormat() { - realOutputFormat = new ParquetOutputFormat(new DataWritableWriteSupport()); + realOutputFormat = new ParquetOutputFormat(new DataWritableWriteSupport()); } - public MapredParquetOutputFormat(final OutputFormat mapreduceOutputFormat) { - realOutputFormat = (ParquetOutputFormat) mapreduceOutputFormat; + public MapredParquetOutputFormat(final OutputFormat mapreduceOutputFormat) { + realOutputFormat = (ParquetOutputFormat) mapreduceOutputFormat; } @Override @@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e } @Override - public RecordWriter getRecordWriter( + public RecordWriter getRecordWriter( final FileSystem ignored, final JobConf job, final String name, @@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e } protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( - ParquetOutputFormat realOutputFormat, + ParquetOutputFormat realOutputFormat, JobConf jobConf, String finalOutPath, Progressable progress, Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Tue Feb 17 06:49:27 2015 @@ -13,61 +13,31 @@ */ package org.apache.hadoop.hive.ql.io.parquet.serde; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.ParquetWriter; -import parquet.io.api.Binary; /** * @@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab private long deserializedSize; private String compressionType; + private ParquetHiveRecord parquetRow; + + public ParquetHiveSerDe() { + parquetRow = new ParquetHiveRecord(); + stats = new SerDeStats(); + } + @Override public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException { @@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); // Stats part - stats = new SerDeStats(); serializedSize = 0; deserializedSize = 0; status = LAST_OPERATION.UNKNOWN; @@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab @Override public Class getSerializedClass() { - return ArrayWritable.class; + return ParquetHiveRecord.class; } @Override @@ -178,150 +154,11 @@ public class ParquetHiveSerDe extends Ab if (!objInspector.getCategory().equals(Category.STRUCT)) { throw new SerDeException("Cannot serialize " + objInspector.getCategory() + ". Can only serialize a struct"); } - final ArrayWritable serializeData = createStruct(obj, (StructObjectInspector) objInspector); - serializedSize = serializeData.get().length; + serializedSize = ((StructObjectInspector)objInspector).getAllStructFieldRefs().size(); status = LAST_OPERATION.SERIALIZE; - return serializeData; - } - - private ArrayWritable createStruct(final Object obj, final StructObjectInspector inspector) - throws SerDeException { - final List fields = inspector.getAllStructFieldRefs(); - final Writable[] arr = new Writable[fields.size()]; - for (int i = 0; i < fields.size(); i++) { - final StructField field = fields.get(i); - final Object subObj = inspector.getStructFieldData(obj, field); - final ObjectInspector subInspector = field.getFieldObjectInspector(); - arr[i] = createObject(subObj, subInspector); - } - return new ArrayWritable(Writable.class, arr); - } - - private Writable createMap(final Object obj, final MapObjectInspector inspector) - throws SerDeException { - final Map sourceMap = inspector.getMap(obj); - final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector(); - final ObjectInspector valueInspector = inspector.getMapValueObjectInspector(); - final List array = new ArrayList(); - - if (sourceMap != null) { - for (final Entry keyValue : sourceMap.entrySet()) { - final Writable key = createObject(keyValue.getKey(), keyInspector); - final Writable value = createObject(keyValue.getValue(), valueInspector); - if (key != null) { - Writable[] arr = new Writable[2]; - arr[0] = key; - arr[1] = value; - array.add(new ArrayWritable(Writable.class, arr)); - } - } - } - if (array.size() > 0) { - final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class, - array.toArray(new ArrayWritable[array.size()])); - return new ArrayWritable(Writable.class, new Writable[] {subArray}); - } else { - return null; - } - } - - private ArrayWritable createArray(final Object obj, final ListObjectInspector inspector) - throws SerDeException { - final List sourceArray = inspector.getList(obj); - final ObjectInspector subInspector = inspector.getListElementObjectInspector(); - final List array = new ArrayList(); - if (sourceArray != null) { - for (final Object curObj : sourceArray) { - array.add(createObject(curObj, subInspector)); - } - } - if (array.size() > 0) { - final ArrayWritable subArray = new ArrayWritable(Writable.class, - array.toArray(new Writable[array.size()])); - return new ArrayWritable(Writable.class, new Writable[] {subArray}); - } else { - return null; - } - } - - private Writable createPrimitive(final Object obj, final PrimitiveObjectInspector inspector) - throws SerDeException { - if (obj == null) { - return null; - } - switch (inspector.getPrimitiveCategory()) { - case VOID: - return null; - case BOOLEAN: - return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) ? Boolean.TRUE : Boolean.FALSE); - case BYTE: - return new ByteWritable(((ByteObjectInspector) inspector).get(obj)); - case DOUBLE: - return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj)); - case FLOAT: - return new FloatWritable(((FloatObjectInspector) inspector).get(obj)); - case INT: - return new IntWritable(((IntObjectInspector) inspector).get(obj)); - case LONG: - return new LongWritable(((LongObjectInspector) inspector).get(obj)); - case SHORT: - return new ShortWritable(((ShortObjectInspector) inspector).get(obj)); - case STRING: - String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj); - try { - return new BytesWritable(v.getBytes("UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new SerDeException("Failed to encode string in UTF-8", e); - } - case DECIMAL: - HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj); - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo(); - int prec = decTypeInfo.precision(); - int scale = decTypeInfo.scale(); - byte[] src = hd.setScale(scale).unscaledValue().toByteArray(); - // Estimated number of bytes needed. - int bytes = PRECISION_TO_BYTE_COUNT[prec - 1]; - if (bytes == src.length) { - // No padding needed. - return new BytesWritable(src); - } - byte[] tgt = new byte[bytes]; - if ( hd.signum() == -1) { - // For negative number, initializing bits to 1 - for (int i = 0; i < bytes; i++) { - tgt[i] |= 0xFF; - } - } - System.arraycopy(src, 0, tgt, bytes - src.length, src.length); // Padding leading zeroes/ones. - return new BytesWritable(tgt); - case TIMESTAMP: - return new TimestampWritable(((TimestampObjectInspector) inspector).getPrimitiveJavaObject(obj)); - case CHAR: - String strippedValue = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(obj).getStrippedValue(); - return new BytesWritable(Binary.fromString(strippedValue).getBytes()); - case VARCHAR: - String value = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(obj).getValue(); - return new BytesWritable(Binary.fromString(value).getBytes()); - case BINARY: - return new BytesWritable(((BinaryObjectInspector) inspector).getPrimitiveJavaObject(obj)); - default: - throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory()); - } - } - - private Writable createObject(final Object obj, final ObjectInspector inspector) throws SerDeException { - switch (inspector.getCategory()) { - case STRUCT: - return createStruct(obj, (StructObjectInspector) inspector); - case LIST: - return createArray(obj, (ListObjectInspector) inspector); - case MAP: - return createMap(obj, (MapObjectInspector) inspector); - case PRIMITIVE: - return createPrimitive(obj, (PrimitiveObjectInspector) inspector); - default: - throw new SerDeException("Unknown data type" + inspector.getCategory()); - } + parquetRow.value = obj; + parquetRow.inspector= (StructObjectInspector)objInspector; + return parquetRow; } @Override Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java Tue Feb 17 06:49:27 2015 @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.ql.io.par import java.sql.Timestamp; import java.util.Calendar; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; import jodd.datetime.JDateTime; @@ -24,9 +25,9 @@ import jodd.datetime.JDateTime; * This utilizes the Jodd library. */ public class NanoTimeUtils { - static final long NANOS_PER_SECOND = 1000000000; - static final long SECONDS_PER_MINUTE = 60; - static final long MINUTES_PER_HOUR = 60; + static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1); + static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1); + static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); private static final ThreadLocal parquetGMTCalendar = new ThreadLocal(); private static final ThreadLocal parquetLocalCalendar = new ThreadLocal(); @@ -63,8 +64,8 @@ public class NanoTimeUtils { long minute = calendar.get(Calendar.MINUTE); long second = calendar.get(Calendar.SECOND); long nanos = ts.getNanos(); - long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_SECOND * SECONDS_PER_MINUTE * minute + - NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR * hour; + long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_MINUTE * minute + + NANOS_PER_HOUR * hour; return new NanoTime(days, nanosOfDay); } @@ -80,10 +81,10 @@ public class NanoTimeUtils { calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay()); long remainder = nanosOfDay; - int hour = (int) (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)); - remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR); - int minutes = (int) (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)); - remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE); + int hour = (int) (remainder / (NANOS_PER_HOUR)); + remainder = remainder % (NANOS_PER_HOUR); + int minutes = (int) (remainder / (NANOS_PER_MINUTE)); + remainder = remainder % (NANOS_PER_MINUTE); int seconds = (int) (remainder / (NANOS_PER_SECOND)); long nanos = remainder % NANOS_PER_SECOND; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java Tue Feb 17 06:49:27 2015 @@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par import java.util.HashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import parquet.hadoop.api.WriteSupport; import parquet.io.api.RecordConsumer; @@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser; * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter * */ -public class DataWritableWriteSupport extends WriteSupport { +public class DataWritableWriteSupport extends WriteSupport { public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema"; @@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex } @Override - public void write(final ArrayWritable record) { + public void write(final ParquetHiveRecord record) { writer.write(record); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java Tue Feb 17 06:49:27 2015 @@ -13,37 +13,29 @@ */ package org.apache.hadoop.hive.ql.io.parquet.write; -import java.sql.Timestamp; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; - +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import parquet.io.api.Binary; import parquet.io.api.RecordConsumer; import parquet.schema.GroupType; import parquet.schema.OriginalType; import parquet.schema.Type; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; + /** * - * DataWritableWriter is a writer, - * that will read an ArrayWritable and give the data to parquet - * with the expected schema - * This is a helper class used by DataWritableWriteSupport class. + * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet + * API with the expected schema. This class is only used through DataWritableWriteSupport class. */ public class DataWritableWriter { private static final Log LOG = LogFactory.getLog(DataWritableWriter.class); @@ -57,13 +49,13 @@ public class DataWritableWriter { /** * It writes all record values to the Parquet RecordConsumer. - * @param record Contains the record of values that are going to be written + * @param record Contains the record that are going to be written. */ - public void write(final ArrayWritable record) { + public void write(final ParquetHiveRecord record) { if (record != null) { recordConsumer.startMessage(); try { - writeGroupFields(record, schema); + writeGroupFields(record.getObject(), record.getObjectInspector(), schema); } catch (RuntimeException e) { String errorMessage = "Parquet record is malformed: " + e.getMessage(); LOG.error(errorMessage, e); @@ -76,19 +68,23 @@ public class DataWritableWriter { /** * It writes all the fields contained inside a group to the RecordConsumer. * @param value The list of values contained in the group. + * @param inspector The object inspector used to get the correct value type. * @param type Type that contains information about the group schema. */ - public void writeGroupFields(final ArrayWritable value, final GroupType type) { + private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) { if (value != null) { + List fields = inspector.getAllStructFieldRefs(); + List fieldValuesList = inspector.getStructFieldsDataAsList(value); + for (int i = 0; i < type.getFieldCount(); i++) { Type fieldType = type.getType(i); String fieldName = fieldType.getName(); - Writable fieldValue = value.get()[i]; + Object fieldValue = fieldValuesList.get(i); - // Parquet does not write null elements if (fieldValue != null) { + ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector(); recordConsumer.startField(fieldName, i); - writeValue(fieldValue, fieldType); + writeValue(fieldValue, fieldInspector, fieldType); recordConsumer.endField(fieldName, i); } } @@ -96,68 +92,93 @@ public class DataWritableWriter { } /** - * It writes the field value to the Parquet RecordConsumer. It detects the field type, and writes + * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls * the correct write function. * @param value The writable object that contains the value. + * @param inspector The object inspector used to get the correct value type. * @param type Type that contains information about the type schema. */ - private void writeValue(final Writable value, final Type type) { + private void writeValue(final Object value, final ObjectInspector inspector, final Type type) { if (type.isPrimitive()) { - writePrimitive(value); - } else if (value instanceof ArrayWritable) { + checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE); + writePrimitive(value, (PrimitiveObjectInspector)inspector); + } else { GroupType groupType = type.asGroupType(); OriginalType originalType = type.getOriginalType(); if (originalType != null && originalType.equals(OriginalType.LIST)) { - writeArray((ArrayWritable)value, groupType); + checkInspectorCategory(inspector, ObjectInspector.Category.LIST); + writeArray(value, (ListObjectInspector)inspector, groupType); } else if (originalType != null && originalType.equals(OriginalType.MAP)) { - writeMap((ArrayWritable)value, groupType); + checkInspectorCategory(inspector, ObjectInspector.Category.MAP); + writeMap(value, (MapObjectInspector)inspector, groupType); } else { - writeGroup((ArrayWritable) value, groupType); + checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT); + writeGroup(value, (StructObjectInspector)inspector, groupType); } - } else { - throw new RuntimeException("Field value is not an ArrayWritable object: " + type); + } + } + + /** + * Checks that an inspector matches the category indicated as a parameter. + * @param inspector The object inspector to check + * @param category The category to match + * @throws IllegalArgumentException if inspector does not match the category + */ + private void checkInspectorCategory(ObjectInspector inspector, ObjectInspector.Category category) { + if (!inspector.getCategory().equals(category)) { + throw new IllegalArgumentException("Invalid data type: expected " + category + + " type, but found: " + inspector.getCategory()); } } /** * It writes a group type and all its values to the Parquet RecordConsumer. * This is used only for optional and required groups. - * @param value ArrayWritable object that contains the group values - * @param type Type that contains information about the group schema + * @param value Object that contains the group values. + * @param inspector The object inspector used to get the correct value type. + * @param type Type that contains information about the group schema. */ - private void writeGroup(final ArrayWritable value, final GroupType type) { + private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) { recordConsumer.startGroup(); - writeGroupFields(value, type); + writeGroupFields(value, inspector, type); recordConsumer.endGroup(); } /** - * It writes a map type and its key-pair values to the Parquet RecordConsumer. - * This is called when the original type (MAP) is detected by writeValue() - * @param value The list of map values that contains the repeated KEY_PAIR_VALUE group type - * @param type Type that contains information about the group schema + * It writes a list type and its array elements to the Parquet RecordConsumer. + * This is called when the original type (LIST) is detected by writeValue()/ + * This function assumes the following schema: + * optional group arrayCol (LIST) { + * repeated group array { + * optional TYPE array_element; + * } + * } + * @param value The object that contains the array values. + * @param inspector The object inspector used to get the correct value type. + * @param type Type that contains information about the group (LIST) schema. */ - private void writeMap(final ArrayWritable value, final GroupType type) { + private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) { + // Get the internal array structure GroupType repeatedType = type.getType(0).asGroupType(); - ArrayWritable repeatedValue = (ArrayWritable)value.get()[0]; recordConsumer.startGroup(); recordConsumer.startField(repeatedType.getName(), 0); - Writable[] map_values = repeatedValue.get(); - for (int record = 0; record < map_values.length; record++) { - Writable key_value_pair = map_values[record]; - if (key_value_pair != null) { - // Hive wraps a map key-pair into an ArrayWritable - if (key_value_pair instanceof ArrayWritable) { - writeGroup((ArrayWritable)key_value_pair, repeatedType); - } else { - throw new RuntimeException("Map key-value pair is not an ArrayWritable object on record " + record); - } - } else { - throw new RuntimeException("Map key-value pair is null on record " + record); + List arrayValues = inspector.getList(value); + ObjectInspector elementInspector = inspector.getListElementObjectInspector(); + + Type elementType = repeatedType.getType(0); + String elementName = elementType.getName(); + + for (Object element : arrayValues) { + recordConsumer.startGroup(); + if (element != null) { + recordConsumer.startField(elementName, 0); + writeValue(element, elementInspector, elementType); + recordConsumer.endField(elementName, 0); } + recordConsumer.endGroup(); } recordConsumer.endField(repeatedType.getName(), 0); @@ -165,35 +186,53 @@ public class DataWritableWriter { } /** - * It writes a list type and its array elements to the Parquet RecordConsumer. - * This is called when the original type (LIST) is detected by writeValue() - * @param array The list of array values that contains the repeated array group type - * @param type Type that contains information about the group schema + * It writes a map type and its key-pair values to the Parquet RecordConsumer. + * This is called when the original type (MAP) is detected by writeValue(). + * This function assumes the following schema: + * optional group mapCol (MAP) { + * repeated group map (MAP_KEY_VALUE) { + * required TYPE key; + * optional TYPE value; + * } + * } + * @param value The object that contains the map key-values. + * @param inspector The object inspector used to get the correct value type. + * @param type Type that contains information about the group (MAP) schema. */ - private void writeArray(final ArrayWritable array, final GroupType type) { + private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) { + // Get the internal map structure (MAP_KEY_VALUE) GroupType repeatedType = type.getType(0).asGroupType(); - ArrayWritable repeatedValue = (ArrayWritable)array.get()[0]; recordConsumer.startGroup(); recordConsumer.startField(repeatedType.getName(), 0); - Writable[] array_values = repeatedValue.get(); - for (int record = 0; record < array_values.length; record++) { - recordConsumer.startGroup(); + Map mapValues = inspector.getMap(value); - // Null values must be wrapped into startGroup/endGroup - Writable element = array_values[record]; - if (element != null) { - for (int i = 0; i < type.getFieldCount(); i++) { - Type fieldType = repeatedType.getType(i); - String fieldName = fieldType.getName(); + Type keyType = repeatedType.getType(0); + String keyName = keyType.getName(); + ObjectInspector keyInspector = inspector.getMapKeyObjectInspector(); - recordConsumer.startField(fieldName, i); - writeValue(element, fieldType); - recordConsumer.endField(fieldName, i); + Type valuetype = repeatedType.getType(1); + String valueName = valuetype.getName(); + ObjectInspector valueInspector = inspector.getMapValueObjectInspector(); + + for (Map.Entry keyValue : mapValues.entrySet()) { + recordConsumer.startGroup(); + if (keyValue != null) { + // write key element + Object keyElement = keyValue.getKey(); + recordConsumer.startField(keyName, 0); + writeValue(keyElement, keyInspector, keyType); + recordConsumer.endField(keyName, 0); + + // write value element + Object valueElement = keyValue.getValue(); + if (valueElement != null) { + recordConsumer.startField(valueName, 1); + writeValue(valueElement, valueInspector, valuetype); + recordConsumer.endField(valueName, 1); } } - recordConsumer.endGroup(); } @@ -203,36 +242,89 @@ public class DataWritableWriter { /** * It writes the primitive value to the Parquet RecordConsumer. - * @param value The writable object that contains the primitive value. + * @param value The object that contains the primitive value. + * @param inspector The object inspector used to get the correct value type. */ - private void writePrimitive(final Writable value) { + private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) { if (value == null) { return; } - if (value instanceof DoubleWritable) { - recordConsumer.addDouble(((DoubleWritable) value).get()); - } else if (value instanceof BooleanWritable) { - recordConsumer.addBoolean(((BooleanWritable) value).get()); - } else if (value instanceof FloatWritable) { - recordConsumer.addFloat(((FloatWritable) value).get()); - } else if (value instanceof IntWritable) { - recordConsumer.addInteger(((IntWritable) value).get()); - } else if (value instanceof LongWritable) { - recordConsumer.addLong(((LongWritable) value).get()); - } else if (value instanceof ShortWritable) { - recordConsumer.addInteger(((ShortWritable) value).get()); - } else if (value instanceof ByteWritable) { - recordConsumer.addInteger(((ByteWritable) value).get()); - } else if (value instanceof HiveDecimalWritable) { - throw new UnsupportedOperationException("HiveDecimalWritable writing not implemented"); - } else if (value instanceof BytesWritable) { - recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable) value).getBytes()))); - } else if (value instanceof TimestampWritable) { - Timestamp ts = ((TimestampWritable) value).getTimestamp(); - NanoTime nt = NanoTimeUtils.getNanoTime(ts, false); - nt.writeValue(recordConsumer); - } else { - throw new IllegalArgumentException("Unknown value type: " + value + " " + value.getClass()); + + switch (inspector.getPrimitiveCategory()) { + case VOID: + return; + case DOUBLE: + recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value)); + break; + case BOOLEAN: + recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value)); + break; + case FLOAT: + recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value)); + break; + case BYTE: + recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value)); + break; + case INT: + recordConsumer.addInteger(((IntObjectInspector) inspector).get(value)); + break; + case LONG: + recordConsumer.addLong(((LongObjectInspector) inspector).get(value)); + break; + case SHORT: + recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value)); + break; + case STRING: + String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromString(v)); + break; + case CHAR: + String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue(); + recordConsumer.addBinary(Binary.fromString(vChar)); + break; + case VARCHAR: + String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue(); + recordConsumer.addBinary(Binary.fromString(vVarchar)); + break; + case BINARY: + byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromByteArray(vBinary)); + break; + case TIMESTAMP: + Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value); + recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); + break; + case DECIMAL: + HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value)); + DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo(); + recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo)); + break; + default: + throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory()); + } + } + + private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) { + int prec = decimalTypeInfo.precision(); + int scale = decimalTypeInfo.scale(); + byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray(); + + // Estimated number of bytes needed. + int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1]; + if (precToBytes == decimalBytes.length) { + // No padding needed. + return Binary.fromByteArray(decimalBytes); } + + byte[] tgt = new byte[precToBytes]; + if (hiveDecimal.signum() == -1) { + // For negative number, initializing bits to 1 + for (int i = 0; i < precToBytes; i++) { + tgt[i] |= 0xFF; + } + } + + System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. + return Binary.fromByteArray(tgt); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Tue Feb 17 06:49:27 2015 @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; @@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.util.Progressable; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.metadata.CompressionCodecName; import parquet.hadoop.util.ContextUtil; -public class ParquetRecordWriterWrapper implements RecordWriter, +public class ParquetRecordWriterWrapper implements RecordWriter, org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class); - private final org.apache.hadoop.mapreduce.RecordWriter realWriter; + private final org.apache.hadoop.mapreduce.RecordWriter realWriter; private final TaskAttemptContext taskContext; public ParquetRecordWriterWrapper( - final OutputFormat realOutputFormat, + final OutputFormat realOutputFormat, final JobConf jobConf, final String name, final Progressable progress, Properties tableProperties) throws @@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper } @Override - public void write(final Void key, final ArrayWritable value) throws IOException { + public void write(final Void key, final ParquetHiveRecord value) throws IOException { try { realWriter.write(key, value); } catch (final InterruptedException e) { @@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper @Override public void write(final Writable w) throws IOException { - write(null, (ArrayWritable) w); + write(null, (ParquetHiveRecord) w); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Feb 17 06:49:27 2015 @@ -29,7 +29,6 @@ import static org.apache.hadoop.hive.ser import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -49,8 +48,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -74,6 +71,8 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventRequestData; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; @@ -82,6 +81,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -98,6 +98,8 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -152,6 +154,31 @@ public class Hive { } }; + // register all permanent functions. need improvement + static { + try { + reloadFunctions(); + } catch (Exception e) { + LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e); + } + } + + public static void reloadFunctions() throws HiveException { + Hive db = Hive.get(); + for (String dbName : db.getAllDatabases()) { + for (String functionName : db.getFunctions(dbName, "*")) { + Function function = db.getFunction(dbName, functionName); + try { + FunctionRegistry.registerPermanentFunction(functionName, function.getClassName(), false, + FunctionTask.toFunctionResource(function.getResourceUris())); + } catch (Exception e) { + LOG.warn("Failed to register persistent function " + + functionName + ":" + function.getClassName() + ". Ignore and continue."); + } + } + } + } + public static Hive get(Configuration c, Class clazz) throws HiveException { return get(c instanceof HiveConf ? (HiveConf)c : new HiveConf(c, clazz)); } @@ -1307,6 +1334,7 @@ public class Hive { * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL + * @param isAcid true if this is an ACID operation */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean holdDDLTime, @@ -1354,16 +1382,19 @@ public class Hive { newPartPath = oldPartPath; } + List newFiles = null; if (replace) { Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal); } else { + newFiles = new ArrayList(); FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); } boolean forceCreate = (!holdDDLTime) ? true : false; - newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs); + newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), + inheritTableSpecs, newFiles); // recreate the partition if it existed before if (!holdDDLTime) { if (isSkewedStoreAsSubdir) { @@ -1376,7 +1407,8 @@ public class Hive { skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); - newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs); + newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs, + newFiles); return new Partition(tbl, newCreatedTpart); } } @@ -1486,6 +1518,8 @@ private void constructOneLBLocationMap(F * @param replace * @param numDP number of dynamic partitions * @param holdDDLTime + * @param listBucketingEnabled + * @param isAcid true if this is an ACID operation * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ @@ -1577,11 +1611,20 @@ private void constructOneLBLocationMap(F public void loadTable(Path loadPath, String tableName, boolean replace, boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid) throws HiveException { + List newFiles = new ArrayList(); Table tbl = getTable(tableName); + HiveConf sessionConf = SessionState.getSessionConf(); if (replace) { - tbl.replaceFiles(loadPath, isSrcLocal); + Path tableDest = tbl.getPath(); + replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal); } else { - tbl.copyFiles(loadPath, isSrcLocal, isAcid); + FileSystem fs; + try { + fs = tbl.getDataLocation().getFileSystem(sessionConf); + copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles); + } catch (IOException e) { + throw new HiveException("addFiles: filesystem error in check phase", e); + } tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true"); } @@ -1606,6 +1649,7 @@ private void constructOneLBLocationMap(F throw new HiveException(e); } } + fireInsertEvent(tbl, null, newFiles); } /** @@ -1693,7 +1737,7 @@ private void constructOneLBLocationMap(F public Partition getPartition(Table tbl, Map partSpec, boolean forceCreate) throws HiveException { - return getPartition(tbl, partSpec, forceCreate, null, true); + return getPartition(tbl, partSpec, forceCreate, null, true, null); } /** @@ -1711,8 +1755,32 @@ private void constructOneLBLocationMap(F * @return result partition object or null if there is no partition * @throws HiveException */ + public Partition getPartition(Table tbl, Map partSpec, boolean forceCreate, + String partPath, boolean inheritTableSpecs) + throws HiveException { + return getPartition(tbl, partSpec, forceCreate, partPath, inheritTableSpecs, null); + } + + /** + * Returns partition metadata + * + * @param tbl + * the partition's table + * @param partSpec + * partition keys and values + * @param forceCreate + * if this is true and partition doesn't exist then a partition is + * created + * @param partPath the path where the partition data is located + * @param inheritTableSpecs whether to copy over the table specs for if/of/serde + * @param newFiles An optional list of new files that were moved into this partition. If + * non-null these will be included in the DML event sent to the metastore. + * @return result partition object or null if there is no partition + * @throws HiveException + */ public Partition getPartition(Table tbl, Map partSpec, - boolean forceCreate, String partPath, boolean inheritTableSpecs) throws HiveException { + boolean forceCreate, String partPath, boolean inheritTableSpecs, List newFiles) + throws HiveException { tbl.validatePartColumnNames(partSpec, true); List pvals = new ArrayList(); for (FieldSchema field : tbl.getPartCols()) { @@ -1772,6 +1840,7 @@ private void constructOneLBLocationMap(F } else { alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath); + fireInsertEvent(tbl, partSpec, newFiles); } } if (tpart == null) { @@ -1813,6 +1882,36 @@ private void constructOneLBLocationMap(F alterPartition(fullName, new Partition(tbl, tpart)); } + private void fireInsertEvent(Table tbl, Map partitionSpec, List newFiles) + throws HiveException { + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { + LOG.debug("Firing dml insert event"); + FireEventRequestData data = new FireEventRequestData(); + InsertEventRequestData insertData = new InsertEventRequestData(); + data.setInsertData(insertData); + if (newFiles != null && newFiles.size() > 0) { + for (Path p : newFiles) { + insertData.addToFilesAdded(p.toString()); + } + } + FireEventRequest rqst = new FireEventRequest(true, data); + rqst.setDbName(tbl.getDbName()); + rqst.setTableName(tbl.getTableName()); + if (partitionSpec != null && partitionSpec.size() > 0) { + List partVals = new ArrayList(partitionSpec.size()); + for (FieldSchema fs : tbl.getPartitionKeys()) { + partVals.add(partitionSpec.get(fs.getName())); + } + rqst.setPartitionVals(partVals); + } + try { + getMSC().fireListenerEvent(rqst); + } catch (TException e) { + throw new HiveException(e); + } + } + } + public boolean dropPartition(String tblName, List part_vals, boolean deleteData) throws HiveException { String[] names = Utilities.getDbTableName(tblName); @@ -2502,10 +2601,12 @@ private void constructOneLBLocationMap(F * @param fs Filesystem * @param isSrcLocal true if source is on local file system * @param isAcid true if this is an ACID based write + * @param newFiles if this is non-null, a list of files that were created as a result of this + * move will be returned. * @throws HiveException */ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException { + FileSystem fs, boolean isSrcLocal, boolean isAcid, List newFiles) throws HiveException { boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); try { @@ -2537,7 +2638,7 @@ private void constructOneLBLocationMap(F // If we're moving files around for an ACID write then the rules and paths are all different. // You can blame this on Owen. if (isAcid) { - moveAcidFiles(srcFs, srcs, destf); + moveAcidFiles(srcFs, srcs, destf, newFiles); } else { // check that source and target paths exist List> result = checkPaths(conf, fs, srcs, srcFs, destf, false); @@ -2549,6 +2650,7 @@ private void constructOneLBLocationMap(F throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]); } + if (newFiles != null) newFiles.add(sdpair[1]); } } } catch (IOException e) { @@ -2557,8 +2659,8 @@ private void constructOneLBLocationMap(F } } - private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst) - throws HiveException { + private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, + List newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta/bucket // We will always only be writing delta files. In the buckets created by FileSinkOperator // it will look like bucket/delta/bucket. So we need to move that into the above structure. @@ -2622,6 +2724,7 @@ private void constructOneLBLocationMap(F LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + bucketDest.toUri().toString()); fs.rename(bucketSrc, bucketDest); + if (newFiles != null) newFiles.add(bucketDest); } } catch (IOException e) { throw new HiveException("Error moving acid files " + e.getMessage(), e); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Tue Feb 17 06:49:27 2015 @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; @@ -246,10 +247,8 @@ public class Partition implements Serial final public Deserializer getDeserializer() { if (deserializer == null) { try { - deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), + deserializer = MetaStoreUtils.getDeserializer(SessionState.getSessionConf(), tPartition, table.getTTable()); - } catch (HiveException e) { - throw new RuntimeException(e); } catch (MetaException e) { throw new RuntimeException(e); } @@ -367,7 +366,7 @@ public class Partition implements Serial try { // Previously, this got the filesystem of the Table, which could be // different from the filesystem of the partition. - FileSystem fs = getDataLocation().getFileSystem(Hive.get().getConf()); + FileSystem fs = getDataLocation().getFileSystem(SessionState.getSessionConf()); String pathPattern = getDataLocation().toString(); if (getBucketCount() > 0) { pathPattern = pathPattern + "/*"; @@ -495,11 +494,11 @@ public class Partition implements Serial public List getCols() { try { - if (Table.hasMetastoreBasedSchema(Hive.get().getConf(), tPartition.getSd())) { + if (Table.hasMetastoreBasedSchema(SessionState.getSessionConf(), tPartition.getSd())) { return tPartition.getSd().getCols(); } - return Hive.getFieldsFromDeserializer(table.getTableName(), getDeserializer()); - } catch (HiveException e) { + return MetaStoreUtils.getFieldsFromDeserializer(table.getTableName(), getDeserializer()); + } catch (Exception e) { LOG.error("Unable to get cols from serde: " + tPartition.getSd().getSerdeInfo().getSerializationLib(), e); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1660293&r1=1660292&r2=1660293&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Tue Feb 17 06:49:27 2015 @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.metadata; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -41,7 +40,6 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.ProtectMode; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -259,7 +257,7 @@ public class Table implements Serializab } final public Class getDeserializerClass() throws Exception { - return MetaStoreUtils.getDeserializerClass(Hive.get().getConf(), tTable); + return MetaStoreUtils.getDeserializerClass(SessionState.getSessionConf(), tTable); } final public Deserializer getDeserializer(boolean skipConfError) { @@ -271,11 +269,9 @@ public class Table implements Serializab final public Deserializer getDeserializerFromMetaStore(boolean skipConfError) { try { - return MetaStoreUtils.getDeserializer(Hive.get().getConf(), tTable, skipConfError); + return MetaStoreUtils.getDeserializer(SessionState.getSessionConf(), tTable, skipConfError); } catch (MetaException e) { throw new RuntimeException(e); - } catch (HiveException e) { - throw new RuntimeException(e); } } @@ -285,7 +281,7 @@ public class Table implements Serializab } try { storageHandler = HiveUtils.getStorageHandler( - Hive.get().getConf(), + SessionState.getSessionConf(), getProperty( org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE)); } catch (Exception e) { @@ -589,12 +585,12 @@ public class Table implements Serializab String serializationLib = getSerializationLib(); try { - if (hasMetastoreBasedSchema(Hive.get().getConf(), serializationLib)) { + if (hasMetastoreBasedSchema(SessionState.getSessionConf(), serializationLib)) { return tTable.getSd().getCols(); } else { - return Hive.getFieldsFromDeserializer(getTableName(), getDeserializer()); + return MetaStoreUtils.getFieldsFromDeserializer(getTableName(), getDeserializer()); } - } catch (HiveException e) { + } catch (Exception e) { LOG.error("Unable to get field from serde: " + serializationLib, e); } return new ArrayList(); @@ -625,42 +621,6 @@ public class Table implements Serializab return tTable.getSd().getNumBuckets(); } - /** - * Replaces the directory corresponding to the table by srcf. Works by - * deleting the table directory and renaming the source directory. - * - * @param srcf - * Source directory - * @param isSrcLocal - * If the source directory is LOCAL - */ - protected void replaceFiles(Path srcf, boolean isSrcLocal) - throws HiveException { - Path tableDest = getPath(); - Hive.replaceFiles(tableDest, srcf, tableDest, tableDest, Hive.get().getConf(), - isSrcLocal); - } - - /** - * Inserts files specified into the partition. Works by moving files - * - * @param srcf - * Files to be moved. Leaf directories or globbed file paths - * @param isSrcLocal - * If the source directory is LOCAL - * @param isAcid - * True if this is an ACID based insert, update, or delete - */ - protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) throws HiveException { - FileSystem fs; - try { - fs = getDataLocation().getFileSystem(Hive.get().getConf()); - Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } - } - public void setInputFormatClass(String name) throws HiveException { if (name == null) { inputFormatClass = null; @@ -931,22 +891,12 @@ public class Table implements Serializab return dbName + "@" + tabName; } - /** - * @return List containing Indexes names if there are indexes on this table - * @throws HiveException - **/ - public List getAllIndexes(short max) throws HiveException { - Hive hive = Hive.get(); - return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max); - } - @SuppressWarnings("nls") public FileStatus[] getSortedPaths() { try { // Previously, this got the filesystem of the Table, which could be // different from the filesystem of the partition. - FileSystem fs = FileSystem.get(getPath().toUri(), Hive.get() - .getConf()); + FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf()); String pathPattern = getPath().toString(); if (getNumBuckets() > 0) { pathPattern = pathPattern + "/*";