hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1660293 [14/48] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/ accumul...
Date Tue, 17 Feb 2015 06:49:34 GMT
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Feb 17 06:49:27 2015
@@ -20,10 +20,17 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.filters.BloomFilter;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
@@ -70,16 +78,12 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -145,23 +149,27 @@ class WriterImpl implements Writer, Memo
   private final OrcFile.WriterContext callbackContext;
   private final OrcFile.EncodingStrategy encodingStrategy;
   private final OrcFile.CompressionStrategy compressionStrategy;
+  private final boolean[] bloomFilterColumns;
+  private final double bloomFilterFpp;
 
   WriterImpl(FileSystem fs,
-             Path path,
-             Configuration conf,
-             ObjectInspector inspector,
-             long stripeSize,
-             CompressionKind compress,
-             int bufferSize,
-             int rowIndexStride,
-             MemoryManager memoryManager,
-             boolean addBlockPadding,
-             OrcFile.Version version,
-             OrcFile.WriterCallback callback,
-             OrcFile.EncodingStrategy encodingStrategy,
-             CompressionStrategy compressionStrategy,
-             float paddingTolerance,
-             long blockSizeValue) throws IOException {
+      Path path,
+      Configuration conf,
+      ObjectInspector inspector,
+      long stripeSize,
+      CompressionKind compress,
+      int bufferSize,
+      int rowIndexStride,
+      MemoryManager memoryManager,
+      boolean addBlockPadding,
+      OrcFile.Version version,
+      OrcFile.WriterCallback callback,
+      EncodingStrategy encodingStrategy,
+      CompressionStrategy compressionStrategy,
+      float paddingTolerance,
+      long blockSizeValue,
+      String bloomFilterColumnNames,
+      double bloomFilterFpp) throws IOException {
     this.fs = fs;
     this.path = path;
     this.conf = conf;
@@ -190,7 +198,20 @@ class WriterImpl implements Writer, Memo
     this.memoryManager = memoryManager;
     buildIndex = rowIndexStride > 0;
     codec = createCodec(compress);
-    this.bufferSize = getEstimatedBufferSize(bufferSize);
+    String allColumns = conf.get(IOConstants.COLUMNS);
+    if (allColumns == null) {
+      allColumns = getColumnNamesFromInspector(inspector);
+    }
+    this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
+    if (version == OrcFile.Version.V_0_11) {
+      /* do not write bloom filters for ORC v11 */
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(null, allColumns, inspector);
+    } else {
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector);
+    }
+    this.bloomFilterFpp = bloomFilterFpp;
     treeWriter = createTreeWriter(inspector, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
@@ -201,8 +222,25 @@ class WriterImpl implements Writer, Memo
     memoryManager.addWriter(path, stripeSize, this);
   }
 
+  private String getColumnNamesFromInspector(ObjectInspector inspector) {
+    List<String> fieldNames = Lists.newArrayList();
+    Joiner joiner = Joiner.on(",");
+    if (inspector instanceof StructObjectInspector) {
+      StructObjectInspector soi = (StructObjectInspector) inspector;
+      List<? extends StructField> fields = soi.getAllStructFieldRefs();
+      for(StructField sf : fields) {
+        fieldNames.add(sf.getFieldName());
+      }
+    }
+    return joiner.join(fieldNames);
+  }
+
+  @VisibleForTesting
   int getEstimatedBufferSize(int bs) {
-    String colNames = conf.get(IOConstants.COLUMNS);
+      return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
+  }
+
+  int getEstimatedBufferSize(String colNames, int bs) {
     long availableMem = getMemoryAvailableForORC();
     if (colNames != null) {
       final int numCols = colNames.split(",").length;
@@ -459,26 +497,27 @@ class WriterImpl implements Writer, Memo
       final EnumSet<CompressionCodec.Modifier> modifiers;
 
       switch (kind) {
-      case DATA:
-      case DICTIONARY_DATA:
-        if (getCompressionStrategy() == CompressionStrategy.SPEED) {
-          modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
-        } else {
-          modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
-        }
-        break;
-      case LENGTH:
-      case DICTIONARY_COUNT:
-      case PRESENT:
-      case ROW_INDEX:
-      case SECONDARY:
-        // easily compressed using the fastest modes
-        modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
-        break;
-      default:
-        LOG.warn("Missing ORC compression modifiers for " + kind);
-        modifiers = null;
-        break;
+        case BLOOM_FILTER:
+        case DATA:
+        case DICTIONARY_DATA:
+          if (getCompressionStrategy() == CompressionStrategy.SPEED) {
+            modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
+          } else {
+            modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
+          }
+          break;
+        case LENGTH:
+        case DICTIONARY_COUNT:
+        case PRESENT:
+        case ROW_INDEX:
+        case SECONDARY:
+          // easily compressed using the fastest modes
+          modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
+          break;
+        default:
+          LOG.warn("Missing ORC compression modifiers for " + kind);
+          modifiers = null;
+          break;
       }
 
       BufferedStream result = streams.get(name);
@@ -499,6 +538,15 @@ class WriterImpl implements Writer, Memo
     }
 
     /**
+     * Get the current column id. After creating all tree writers this count should tell how many
+     * columns (including columns within nested complex objects) are created in total.
+     * @return current column id
+     */
+    public int getCurrentColumnId() {
+      return columnCount;
+    }
+
+    /**
      * Get the stride rate of the row index.
      */
     public int getRowIndexStride() {
@@ -538,6 +586,22 @@ class WriterImpl implements Writer, Memo
     }
 
     /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    public boolean[] getBloomFilterColumns() {
+      return bloomFilterColumns;
+    }
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    public double getBloomFilterFPP() {
+      return bloomFilterFpp;
+    }
+
+    /**
      * Get the writer's configuration.
      * @return configuration
      */
@@ -572,6 +636,11 @@ class WriterImpl implements Writer, Memo
     private final OrcProto.RowIndex.Builder rowIndex;
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
     private final PositionedOutputStream rowIndexStream;
+    private final PositionedOutputStream bloomFilterStream;
+    protected final BloomFilter bloomFilter;
+    protected final boolean createBloomFilter;
+    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
     private boolean foundNulls;
     private OutStream isPresentOutStream;
     private final List<StripeStatistics.Builder> stripeStatsBuilders;
@@ -598,6 +667,7 @@ class WriterImpl implements Writer, Memo
         isPresent = null;
       }
       this.foundNulls = false;
+      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
       indexStatistics = ColumnStatisticsImpl.create(inspector);
       stripeColStatistics = ColumnStatisticsImpl.create(inspector);
       fileStatistics = ColumnStatisticsImpl.create(inspector);
@@ -607,11 +677,22 @@ class WriterImpl implements Writer, Memo
       rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
       stripeStatsBuilders = Lists.newArrayList();
       if (streamFactory.buildIndex()) {
-        rowIndexStream = streamFactory.createStream(id,
-            OrcProto.Stream.Kind.ROW_INDEX);
+        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
       } else {
         rowIndexStream = null;
       }
+      if (createBloomFilter) {
+        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
+        bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+            streamFactory.getBloomFilterFPP());
+      } else {
+        bloomFilterEntry = null;
+        bloomFilterIndex = null;
+        bloomFilterStream = null;
+        bloomFilter = null;
+      }
     }
 
     protected OrcProto.RowIndex.Builder getRowIndex() {
@@ -630,7 +711,7 @@ class WriterImpl implements Writer, Memo
       return rowIndexEntry;
     }
 
-    IntegerWriter createIntegerWriter(PositionedOutputStream output,
+    IntegerWriter createIntegerWriter(OutStream output,
                                       boolean signed, boolean isDirectV2,
                                       StreamFactory writer) {
       if (isDirectV2) {
@@ -725,6 +806,14 @@ class WriterImpl implements Writer, Memo
       }
       rowIndex.clear();
       rowIndexEntry.clear();
+
+      // write the bloom filter to out stream
+      if (bloomFilterStream != null) {
+        bloomFilterIndex.build().writeTo(bloomFilterStream);
+        bloomFilterStream.flush();
+        bloomFilterIndex.clear();
+        bloomFilterEntry.clear();
+      }
     }
 
     private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
@@ -763,12 +852,23 @@ class WriterImpl implements Writer, Memo
       indexStatistics.reset();
       rowIndex.addEntry(rowIndexEntry);
       rowIndexEntry.clear();
+      addBloomFilterEntry();
       recordPosition(rowIndexPosition);
       for(TreeWriter child: childrenWriters) {
         child.createRowIndexEntry();
       }
     }
 
+    void addBloomFilterEntry() {
+      if (createBloomFilter) {
+        bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+        bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
+        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+        bloomFilter.reset();
+        bloomFilterEntry.clear();
+      }
+    }
+
     /**
      * Record the current position in each of this column's streams.
      * @param recorder where should the locations be recorded
@@ -851,6 +951,9 @@ class WriterImpl implements Writer, Memo
       if (obj != null) {
         byte val = ((ByteObjectInspector) inspector).get(obj);
         indexStatistics.updateInteger(val);
+        if (createBloomFilter) {
+          bloomFilter.addLong(val);
+        }
         writer.write(val);
       }
     }
@@ -882,7 +985,7 @@ class WriterImpl implements Writer, Memo
                       StreamFactory writer,
                       boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
-      PositionedOutputStream out = writer.createStream(id,
+      OutStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.writer = createIntegerWriter(out, true, isDirectV2, writer);
@@ -926,6 +1029,10 @@ class WriterImpl implements Writer, Memo
           val = shortInspector.get(obj);
         }
         indexStatistics.updateInteger(val);
+        if (createBloomFilter) {
+          // integers are converted to longs in column statistics and during SARG evaluation
+          bloomFilter.addLong(val);
+        }
         writer.write(val);
       }
     }
@@ -966,6 +1073,10 @@ class WriterImpl implements Writer, Memo
       if (obj != null) {
         float val = ((FloatObjectInspector) inspector).get(obj);
         indexStatistics.updateDouble(val);
+        if (createBloomFilter) {
+          // floats are converted to doubles in column statistics and during SARG evaluation
+          bloomFilter.addDouble(val);
+        }
         utils.writeFloat(stream, val);
       }
     }
@@ -1006,6 +1117,9 @@ class WriterImpl implements Writer, Memo
       if (obj != null) {
         double val = ((DoubleObjectInspector) inspector).get(obj);
         indexStatistics.updateDouble(val);
+        if (createBloomFilter) {
+          bloomFilter.addDouble(val);
+        }
         utils.writeDouble(stream, val);
       }
     }
@@ -1099,6 +1213,9 @@ class WriterImpl implements Writer, Memo
           directLengthOutput.write(val.getLength());
         }
         indexStatistics.updateString(val);
+        if (createBloomFilter) {
+          bloomFilter.addBytes(val.getBytes(), val.getLength());
+        }
       }
     }
 
@@ -1162,6 +1279,14 @@ class WriterImpl implements Writer, Memo
         // Write the dictionary by traversing the red-black tree writing out
         // the bytes and lengths; and creating the map from the original order
         // to the final sorted order.
+        if (dictionary.size() == 0) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Empty dictionary. Suppressing dictionary stream.");
+          }
+          stringOutput.suppress();
+          lengthOutput.suppress();
+        }
+
         dictionary.visit(new StringRedBlackTree.Visitor() {
           private int currentId = 0;
           @Override
@@ -1250,6 +1375,7 @@ class WriterImpl implements Writer, Memo
       OrcProto.RowIndexEntry base = rowIndexEntry.build();
       savedRowIndex.add(base);
       rowIndexEntry.clear();
+      addBloomFilterEntry();
       recordPosition(rowIndexPosition);
       rowIndexValueCount.add(Long.valueOf(rows.size()));
       if (strideDictionaryCheck) {
@@ -1360,6 +1486,9 @@ class WriterImpl implements Writer, Memo
         stream.write(val.getBytes(), 0, val.getLength());
         length.write(val.getLength());
         indexStatistics.updateBinary(val);
+        if (createBloomFilter) {
+          bloomFilter.addBytes(val.getBytes(), val.getLength());
+        }
       }
     }
 
@@ -1422,6 +1551,9 @@ class WriterImpl implements Writer, Memo
         indexStatistics.updateTimestamp(val);
         seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
         nanos.write(formatNanos(val.getNanos()));
+        if (createBloomFilter) {
+          bloomFilter.addLong(val.getTime());
+        }
       }
     }
 
@@ -1467,7 +1599,7 @@ class WriterImpl implements Writer, Memo
                    StreamFactory writer,
                    boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
-      PositionedOutputStream out = writer.createStream(id,
+      OutStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.writer = createIntegerWriter(out, true, isDirectV2, writer);
@@ -1482,6 +1614,9 @@ class WriterImpl implements Writer, Memo
         DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
         indexStatistics.updateDate(val);
         writer.write(val.getDays());
+        if (createBloomFilter) {
+          bloomFilter.addLong(val.getDays());
+        }
       }
     }
 
@@ -1550,6 +1685,9 @@ class WriterImpl implements Writer, Memo
             decimal.unscaledValue());
         scaleStream.write(decimal.scale());
         indexStatistics.updateDecimal(decimal);
+        if (createBloomFilter) {
+          bloomFilter.addString(decimal.toString());
+        }
       }
     }
 
@@ -1649,6 +1787,9 @@ class WriterImpl implements Writer, Memo
         ListObjectInspector insp = (ListObjectInspector) inspector;
         int len = insp.getListLength(obj);
         lengths.write(len);
+        if (createBloomFilter) {
+          bloomFilter.addLong(len);
+        }
         for(int i=0; i < len; ++i) {
           childrenWriters[0].write(insp.getListElement(obj, i));
         }
@@ -1713,6 +1854,9 @@ class WriterImpl implements Writer, Memo
         // accessor in the MapObjectInspector.
         Map<?, ?> valueMap = insp.getMap(obj);
         lengths.write(valueMap.size());
+        if (createBloomFilter) {
+          bloomFilter.addLong(valueMap.size());
+        }
         for(Map.Entry<?, ?> entry: valueMap.entrySet()) {
           childrenWriters[0].write(entry.getKey());
           childrenWriters[1].write(entry.getValue());
@@ -1765,6 +1909,9 @@ class WriterImpl implements Writer, Memo
         UnionObjectInspector insp = (UnionObjectInspector) inspector;
         byte tag = insp.getTag(obj);
         tags.write(tag);
+        if (createBloomFilter) {
+          bloomFilter.addLong(tag);
+        }
         childrenWriters[tag].write(insp.getField(obj));
       }
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Tue Feb 17 06:49:27 2015
@@ -21,7 +21,6 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo
 import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
 import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
 
 /**
  *
  * A Parquet OutputFormat for Hive (with the deprecated package mapred)
  *
  */
-public class MapredParquetOutputFormat extends FileOutputFormat<Void, ArrayWritable> implements
-  HiveOutputFormat<Void, ArrayWritable> {
+public class MapredParquetOutputFormat extends FileOutputFormat<Void, ParquetHiveRecord> implements
+  HiveOutputFormat<Void, ParquetHiveRecord> {
 
   private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class);
 
-  protected ParquetOutputFormat<ArrayWritable> realOutputFormat;
+  protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
 
   public MapredParquetOutputFormat() {
-    realOutputFormat = new ParquetOutputFormat<ArrayWritable>(new DataWritableWriteSupport());
+    realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport());
   }
 
-  public MapredParquetOutputFormat(final OutputFormat<Void, ArrayWritable> mapreduceOutputFormat) {
-    realOutputFormat = (ParquetOutputFormat<ArrayWritable>) mapreduceOutputFormat;
+  public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> mapreduceOutputFormat) {
+    realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mapreduceOutputFormat;
   }
 
   @Override
@@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e
   }
 
   @Override
-  public RecordWriter<Void, ArrayWritable> getRecordWriter(
+  public RecordWriter<Void, ParquetHiveRecord> getRecordWriter(
       final FileSystem ignored,
       final JobConf job,
       final String name,
@@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e
   }
 
   protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-      ParquetOutputFormat<ArrayWritable> realOutputFormat,
+      ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
       JobConf jobConf,
       String finalOutPath,
       Progressable progress,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Tue Feb 17 06:49:27 2015
@@ -13,61 +13,31 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.serde;
 
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.ParquetWriter;
-import parquet.io.api.Binary;
 
 /**
  *
@@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab
   private long deserializedSize;
   private String compressionType;
 
+  private ParquetHiveRecord parquetRow;
+
+  public ParquetHiveSerDe() {
+    parquetRow = new ParquetHiveRecord();
+    stats = new SerDeStats();
+  }
+
   @Override
   public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException {
 
@@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab
     this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
 
     // Stats part
-    stats = new SerDeStats();
     serializedSize = 0;
     deserializedSize = 0;
     status = LAST_OPERATION.UNKNOWN;
@@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return ArrayWritable.class;
+    return ParquetHiveRecord.class;
   }
 
   @Override
@@ -178,150 +154,11 @@ public class ParquetHiveSerDe extends Ab
     if (!objInspector.getCategory().equals(Category.STRUCT)) {
       throw new SerDeException("Cannot serialize " + objInspector.getCategory() + ". Can only serialize a struct");
     }
-    final ArrayWritable serializeData = createStruct(obj, (StructObjectInspector) objInspector);
-    serializedSize = serializeData.get().length;
+    serializedSize = ((StructObjectInspector)objInspector).getAllStructFieldRefs().size();
     status = LAST_OPERATION.SERIALIZE;
-    return serializeData;
-  }
-
-  private ArrayWritable createStruct(final Object obj, final StructObjectInspector inspector)
-      throws SerDeException {
-    final List<? extends StructField> fields = inspector.getAllStructFieldRefs();
-    final Writable[] arr = new Writable[fields.size()];
-    for (int i = 0; i < fields.size(); i++) {
-      final StructField field = fields.get(i);
-      final Object subObj = inspector.getStructFieldData(obj, field);
-      final ObjectInspector subInspector = field.getFieldObjectInspector();
-      arr[i] = createObject(subObj, subInspector);
-    }
-    return new ArrayWritable(Writable.class, arr);
-  }
-
-  private Writable createMap(final Object obj, final MapObjectInspector inspector)
-      throws SerDeException {
-    final Map<?, ?> sourceMap = inspector.getMap(obj);
-    final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
-    final ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
-    final List<ArrayWritable> array = new ArrayList<ArrayWritable>();
-
-    if (sourceMap != null) {
-      for (final Entry<?, ?> keyValue : sourceMap.entrySet()) {
-        final Writable key = createObject(keyValue.getKey(), keyInspector);
-        final Writable value = createObject(keyValue.getValue(), valueInspector);
-        if (key != null) {
-          Writable[] arr = new Writable[2];
-          arr[0] = key;
-          arr[1] = value;
-          array.add(new ArrayWritable(Writable.class, arr));
-        }
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class,
-          array.toArray(new ArrayWritable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private ArrayWritable createArray(final Object obj, final ListObjectInspector inspector)
-      throws SerDeException {
-    final List<?> sourceArray = inspector.getList(obj);
-    final ObjectInspector subInspector = inspector.getListElementObjectInspector();
-    final List<Writable> array = new ArrayList<Writable>();
-    if (sourceArray != null) {
-      for (final Object curObj : sourceArray) {
-        array.add(createObject(curObj, subInspector));
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(Writable.class,
-          array.toArray(new Writable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private Writable createPrimitive(final Object obj, final PrimitiveObjectInspector inspector)
-      throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-    switch (inspector.getPrimitiveCategory()) {
-    case VOID:
-      return null;
-    case BOOLEAN:
-      return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) ? Boolean.TRUE : Boolean.FALSE);
-    case BYTE:
-      return new ByteWritable(((ByteObjectInspector) inspector).get(obj));
-    case DOUBLE:
-      return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
-    case FLOAT:
-      return new FloatWritable(((FloatObjectInspector) inspector).get(obj));
-    case INT:
-      return new IntWritable(((IntObjectInspector) inspector).get(obj));
-    case LONG:
-      return new LongWritable(((LongObjectInspector) inspector).get(obj));
-    case SHORT:
-      return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
-    case STRING:
-      String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj);
-      try {
-        return new BytesWritable(v.getBytes("UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        throw new SerDeException("Failed to encode string in UTF-8", e);
-      }
-    case DECIMAL:
-      HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj);
-      DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo();
-      int prec = decTypeInfo.precision();
-      int scale = decTypeInfo.scale();
-      byte[] src = hd.setScale(scale).unscaledValue().toByteArray();
-      // Estimated number of bytes needed.
-      int bytes =  PRECISION_TO_BYTE_COUNT[prec - 1];
-      if (bytes == src.length) {
-        // No padding needed.
-        return new BytesWritable(src);
-      }
-      byte[] tgt = new byte[bytes];
-      if ( hd.signum() == -1) {
-        // For negative number, initializing bits to 1
-        for (int i = 0; i < bytes; i++) {
-          tgt[i] |= 0xFF;
-        }
-      }
-      System.arraycopy(src, 0, tgt, bytes - src.length, src.length); // Padding leading zeroes/ones.
-      return new BytesWritable(tgt);
-    case TIMESTAMP:
-      return new TimestampWritable(((TimestampObjectInspector) inspector).getPrimitiveJavaObject(obj));
-    case CHAR:
-      String strippedValue = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(obj).getStrippedValue();
-      return new BytesWritable(Binary.fromString(strippedValue).getBytes());
-    case VARCHAR:
-      String value = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(obj).getValue();
-      return new BytesWritable(Binary.fromString(value).getBytes());
-    case BINARY:
-      return new BytesWritable(((BinaryObjectInspector) inspector).getPrimitiveJavaObject(obj));
-    default:
-      throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
-    }
-  }
-
-  private Writable createObject(final Object obj, final ObjectInspector inspector) throws SerDeException {
-    switch (inspector.getCategory()) {
-    case STRUCT:
-      return createStruct(obj, (StructObjectInspector) inspector);
-    case LIST:
-      return createArray(obj, (ListObjectInspector) inspector);
-    case MAP:
-      return createMap(obj, (MapObjectInspector) inspector);
-    case PRIMITIVE:
-      return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
-    default:
-      throw new SerDeException("Unknown data type" + inspector.getCategory());
-    }
+    parquetRow.value = obj;
+    parquetRow.inspector= (StructObjectInspector)objInspector;
+    return parquetRow;
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java Tue Feb 17 06:49:27 2015
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
 import java.sql.Timestamp;
 import java.util.Calendar;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 import jodd.datetime.JDateTime;
 
@@ -24,9 +25,9 @@ import jodd.datetime.JDateTime;
  * This utilizes the Jodd library.
  */
 public class NanoTimeUtils {
-   static final long NANOS_PER_SECOND = 1000000000;
-   static final long SECONDS_PER_MINUTE = 60;
-   static final long MINUTES_PER_HOUR = 60;
+   static final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
+   static final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
+   static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
 
    private static final ThreadLocal<Calendar> parquetGMTCalendar = new ThreadLocal<Calendar>();
    private static final ThreadLocal<Calendar> parquetLocalCalendar = new ThreadLocal<Calendar>();
@@ -63,8 +64,8 @@ public class NanoTimeUtils {
      long minute = calendar.get(Calendar.MINUTE);
      long second = calendar.get(Calendar.SECOND);
      long nanos = ts.getNanos();
-     long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_SECOND * SECONDS_PER_MINUTE * minute +
-         NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR * hour;
+     long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_MINUTE * minute +
+         NANOS_PER_HOUR * hour;
 
      return new NanoTime(days, nanosOfDay);
    }
@@ -80,10 +81,10 @@ public class NanoTimeUtils {
      calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay());
 
      long remainder = nanosOfDay;
-     int hour = (int) (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR));
-     remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
-     int minutes = (int) (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE));
-     remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE);
+     int hour = (int) (remainder / (NANOS_PER_HOUR));
+     remainder = remainder % (NANOS_PER_HOUR);
+     int minutes = (int) (remainder / (NANOS_PER_MINUTE));
+     remainder = remainder % (NANOS_PER_MINUTE);
      int seconds = (int) (remainder / (NANOS_PER_SECOND));
      long nanos = remainder % NANOS_PER_SECOND;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java Tue Feb 17 06:49:27 2015
@@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 
 import parquet.hadoop.api.WriteSupport;
 import parquet.io.api.RecordConsumer;
@@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser;
  * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
  *
  */
-public class DataWritableWriteSupport extends WriteSupport<ArrayWritable> {
+public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
 
   public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
 
@@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex
   }
 
   @Override
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     writer.write(record);
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java Tue Feb 17 06:49:27 2015
@@ -13,37 +13,29 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.write;
 
-import java.sql.Timestamp;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.GroupType;
 import parquet.schema.OriginalType;
 import parquet.schema.Type;
 
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
 /**
  *
- * DataWritableWriter is a writer,
- * that will read an ArrayWritable and give the data to parquet
- * with the expected schema
- * This is a helper class used by DataWritableWriteSupport class.
+ * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet
+ * API with the expected schema. This class is only used through DataWritableWriteSupport class.
  */
 public class DataWritableWriter {
   private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
@@ -57,13 +49,13 @@ public class DataWritableWriter {
 
   /**
    * It writes all record values to the Parquet RecordConsumer.
-   * @param record Contains the record of values that are going to be written
+   * @param record Contains the record that are going to be written.
    */
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     if (record != null) {
       recordConsumer.startMessage();
       try {
-        writeGroupFields(record, schema);
+        writeGroupFields(record.getObject(), record.getObjectInspector(), schema);
       } catch (RuntimeException e) {
         String errorMessage = "Parquet record is malformed: " + e.getMessage();
         LOG.error(errorMessage, e);
@@ -76,19 +68,23 @@ public class DataWritableWriter {
   /**
    * It writes all the fields contained inside a group to the RecordConsumer.
    * @param value The list of values contained in the group.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the group schema.
    */
-  public void writeGroupFields(final ArrayWritable value, final GroupType type) {
+  private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) {
     if (value != null) {
+      List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+      List<Object> fieldValuesList = inspector.getStructFieldsDataAsList(value);
+
       for (int i = 0; i < type.getFieldCount(); i++) {
         Type fieldType = type.getType(i);
         String fieldName = fieldType.getName();
-        Writable fieldValue = value.get()[i];
+        Object fieldValue = fieldValuesList.get(i);
 
-        // Parquet does not write null elements
         if (fieldValue != null) {
+          ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector();
           recordConsumer.startField(fieldName, i);
-          writeValue(fieldValue, fieldType);
+          writeValue(fieldValue, fieldInspector, fieldType);
           recordConsumer.endField(fieldName, i);
         }
       }
@@ -96,68 +92,93 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and writes
+   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls
    * the correct write function.
    * @param value The writable object that contains the value.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the type schema.
    */
-  private void writeValue(final Writable value, final Type type) {
+  private void writeValue(final Object value, final ObjectInspector inspector, final Type type) {
     if (type.isPrimitive()) {
-      writePrimitive(value);
-    } else if (value instanceof ArrayWritable) {
+      checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE);
+      writePrimitive(value, (PrimitiveObjectInspector)inspector);
+    } else {
       GroupType groupType = type.asGroupType();
       OriginalType originalType = type.getOriginalType();
 
       if (originalType != null && originalType.equals(OriginalType.LIST)) {
-        writeArray((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
+        writeArray(value, (ListObjectInspector)inspector, groupType);
       } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
-        writeMap((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
+        writeMap(value, (MapObjectInspector)inspector, groupType);
       } else {
-        writeGroup((ArrayWritable) value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
+        writeGroup(value, (StructObjectInspector)inspector, groupType);
       }
-    } else {
-      throw new RuntimeException("Field value is not an ArrayWritable object: " + type);
+    }
+  }
+
+  /**
+   * Checks that an inspector matches the category indicated as a parameter.
+   * @param inspector The object inspector to check
+   * @param category The category to match
+   * @throws IllegalArgumentException if inspector does not match the category
+   */
+  private void checkInspectorCategory(ObjectInspector inspector, ObjectInspector.Category category) {
+    if (!inspector.getCategory().equals(category)) {
+      throw new IllegalArgumentException("Invalid data type: expected " + category
+          + " type, but found: " + inspector.getCategory());
     }
   }
 
   /**
    * It writes a group type and all its values to the Parquet RecordConsumer.
    * This is used only for optional and required groups.
-   * @param value ArrayWritable object that contains the group values
-   * @param type Type that contains information about the group schema
+   * @param value Object that contains the group values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group schema.
    */
-  private void writeGroup(final ArrayWritable value, final GroupType type) {
+  private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) {
     recordConsumer.startGroup();
-    writeGroupFields(value, type);
+    writeGroupFields(value, inspector, type);
     recordConsumer.endGroup();
   }
 
   /**
-   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
-   * This is called when the original type (MAP) is detected by writeValue()
-   * @param value The list of map values that contains the repeated KEY_PAIR_VALUE group type
-   * @param type Type that contains information about the group schema
+   * It writes a list type and its array elements to the Parquet RecordConsumer.
+   * This is called when the original type (LIST) is detected by writeValue()/
+   * This function assumes the following schema:
+   *    optional group arrayCol (LIST) {
+   *      repeated group array {
+   *        optional TYPE array_element;
+   *      }
+   *    }
+   * @param value The object that contains the array values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (LIST) schema.
    */
-  private void writeMap(final ArrayWritable value, final GroupType type) {
+  private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) {
+    // Get the internal array structure
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)value.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] map_values = repeatedValue.get();
-    for (int record = 0; record < map_values.length; record++) {
-      Writable key_value_pair = map_values[record];
-      if (key_value_pair != null) {
-        // Hive wraps a map key-pair into an ArrayWritable
-        if (key_value_pair instanceof ArrayWritable) {
-          writeGroup((ArrayWritable)key_value_pair, repeatedType);
-        } else {
-          throw new RuntimeException("Map key-value pair is not an ArrayWritable object on record " + record);
-        }
-      } else {
-        throw new RuntimeException("Map key-value pair is null on record " + record);
+    List<?> arrayValues = inspector.getList(value);
+    ObjectInspector elementInspector = inspector.getListElementObjectInspector();
+
+    Type elementType = repeatedType.getType(0);
+    String elementName = elementType.getName();
+
+    for (Object element : arrayValues) {
+      recordConsumer.startGroup();
+      if (element != null) {
+        recordConsumer.startField(elementName, 0);
+        writeValue(element, elementInspector, elementType);
+        recordConsumer.endField(elementName, 0);
       }
+      recordConsumer.endGroup();
     }
 
     recordConsumer.endField(repeatedType.getName(), 0);
@@ -165,35 +186,53 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes a list type and its array elements to the Parquet RecordConsumer.
-   * This is called when the original type (LIST) is detected by writeValue()
-   * @param array The list of array values that contains the repeated array group type
-   * @param type Type that contains information about the group schema
+   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
+   * This is called when the original type (MAP) is detected by writeValue().
+   * This function assumes the following schema:
+   *    optional group mapCol (MAP) {
+   *      repeated group map (MAP_KEY_VALUE) {
+   *        required TYPE key;
+   *        optional TYPE value;
+   *      }
+   *    }
+   * @param value The object that contains the map key-values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (MAP) schema.
    */
-  private void writeArray(final ArrayWritable array, final GroupType type) {
+  private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) {
+    // Get the internal map structure (MAP_KEY_VALUE)
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)array.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] array_values = repeatedValue.get();
-    for (int record = 0; record < array_values.length; record++) {
-      recordConsumer.startGroup();
+    Map<?, ?> mapValues = inspector.getMap(value);
 
-      // Null values must be wrapped into startGroup/endGroup
-      Writable element = array_values[record];
-      if (element != null) {
-        for (int i = 0; i < type.getFieldCount(); i++) {
-          Type fieldType = repeatedType.getType(i);
-          String fieldName = fieldType.getName();
+    Type keyType = repeatedType.getType(0);
+    String keyName = keyType.getName();
+    ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
 
-          recordConsumer.startField(fieldName, i);
-          writeValue(element, fieldType);
-          recordConsumer.endField(fieldName, i);
+    Type valuetype = repeatedType.getType(1);
+    String valueName = valuetype.getName();
+    ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
+
+    for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+      recordConsumer.startGroup();
+      if (keyValue != null) {
+        // write key element
+        Object keyElement = keyValue.getKey();
+        recordConsumer.startField(keyName, 0);
+        writeValue(keyElement, keyInspector, keyType);
+        recordConsumer.endField(keyName, 0);
+
+        // write value element
+        Object valueElement = keyValue.getValue();
+        if (valueElement != null) {
+          recordConsumer.startField(valueName, 1);
+          writeValue(valueElement, valueInspector, valuetype);
+          recordConsumer.endField(valueName, 1);
         }
       }
-
       recordConsumer.endGroup();
     }
 
@@ -203,36 +242,89 @@ public class DataWritableWriter {
 
   /**
    * It writes the primitive value to the Parquet RecordConsumer.
-   * @param value The writable object that contains the primitive value.
+   * @param value The object that contains the primitive value.
+   * @param inspector The object inspector used to get the correct value type.
    */
-  private void writePrimitive(final Writable value) {
+  private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) {
     if (value == null) {
       return;
     }
-    if (value instanceof DoubleWritable) {
-      recordConsumer.addDouble(((DoubleWritable) value).get());
-    } else if (value instanceof BooleanWritable) {
-      recordConsumer.addBoolean(((BooleanWritable) value).get());
-    } else if (value instanceof FloatWritable) {
-      recordConsumer.addFloat(((FloatWritable) value).get());
-    } else if (value instanceof IntWritable) {
-      recordConsumer.addInteger(((IntWritable) value).get());
-    } else if (value instanceof LongWritable) {
-      recordConsumer.addLong(((LongWritable) value).get());
-    } else if (value instanceof ShortWritable) {
-      recordConsumer.addInteger(((ShortWritable) value).get());
-    } else if (value instanceof ByteWritable) {
-      recordConsumer.addInteger(((ByteWritable) value).get());
-    } else if (value instanceof HiveDecimalWritable) {
-      throw new UnsupportedOperationException("HiveDecimalWritable writing not implemented");
-    } else if (value instanceof BytesWritable) {
-      recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable) value).getBytes())));
-    } else if (value instanceof TimestampWritable) {
-      Timestamp ts = ((TimestampWritable) value).getTimestamp();
-      NanoTime nt = NanoTimeUtils.getNanoTime(ts, false);
-      nt.writeValue(recordConsumer);
-    } else {
-      throw new IllegalArgumentException("Unknown value type: " + value + " " + value.getClass());
+
+    switch (inspector.getPrimitiveCategory()) {
+      case VOID:
+        return;
+      case DOUBLE:
+        recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value));
+        break;
+      case BOOLEAN:
+        recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value));
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value));
+        break;
+      case BYTE:
+        recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value));
+        break;
+      case INT:
+        recordConsumer.addInteger(((IntObjectInspector) inspector).get(value));
+        break;
+      case LONG:
+        recordConsumer.addLong(((LongObjectInspector) inspector).get(value));
+        break;
+      case SHORT:
+        recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value));
+        break;
+      case STRING:
+        String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromString(v));
+        break;
+      case CHAR:
+        String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue();
+        recordConsumer.addBinary(Binary.fromString(vChar));
+        break;
+      case VARCHAR:
+        String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue();
+        recordConsumer.addBinary(Binary.fromString(vVarchar));
+        break;
+      case BINARY:
+        byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+        break;
+      case TIMESTAMP:
+        Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
+        break;
+      case DECIMAL:
+        HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value));
+        DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
+        recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
+    }
+  }
+
+  private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) {
+    int prec = decimalTypeInfo.precision();
+    int scale = decimalTypeInfo.scale();
+    byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+    // Estimated number of bytes needed.
+    int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+    if (precToBytes == decimalBytes.length) {
+      // No padding needed.
+      return Binary.fromByteArray(decimalBytes);
     }
+
+    byte[] tgt = new byte[precToBytes];
+      if (hiveDecimal.signum() == -1) {
+      // For negative number, initializing bits to 1
+      for (int i = 0; i < precToBytes; i++) {
+        tgt[i] |= 0xFF;
+      }
+    }
+
+    System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
+    return Binary.fromByteArray(tgt);
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Tue Feb 17 06:49:27 2015
@@ -20,7 +20,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.util.ContextUtil;
 
-public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>,
+public class ParquetRecordWriterWrapper implements RecordWriter<Void, ParquetHiveRecord>,
   org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
 
   public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class);
 
-  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable> realWriter;
+  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ParquetHiveRecord> realWriter;
   private final TaskAttemptContext taskContext;
 
   public ParquetRecordWriterWrapper(
-      final OutputFormat<Void, ArrayWritable> realOutputFormat,
+      final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
       final JobConf jobConf,
       final String name,
       final Progressable progress, Properties tableProperties) throws
@@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper
   }
 
   @Override
-  public void write(final Void key, final ArrayWritable value) throws IOException {
+  public void write(final Void key, final ParquetHiveRecord value) throws IOException {
     try {
       realWriter.write(key, value);
     } catch (final InterruptedException e) {
@@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper
 
   @Override
   public void write(final Writable w) throws IOException {
-    write(null, (ArrayWritable) w);
+    write(null, (ParquetHiveRecord) w);
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Feb 17 06:49:27 2015
@@ -29,7 +29,6 @@ import static org.apache.hadoop.hive.ser
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,8 +48,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -74,6 +71,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
@@ -82,6 +81,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.HiveObjectType;
 import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -98,6 +98,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -152,6 +154,31 @@ public class Hive {
     }
   };
 
+  // register all permanent functions. need improvement
+  static {
+    try {
+      reloadFunctions();
+    } catch (Exception e) {
+      LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
+    }
+  }
+
+  public static void reloadFunctions() throws HiveException {
+    Hive db = Hive.get();
+    for (String dbName : db.getAllDatabases()) {
+      for (String functionName : db.getFunctions(dbName, "*")) {
+        Function function = db.getFunction(dbName, functionName);
+        try {
+          FunctionRegistry.registerPermanentFunction(functionName, function.getClassName(), false,
+              FunctionTask.toFunctionResource(function.getResourceUris()));
+        } catch (Exception e) {
+          LOG.warn("Failed to register persistent function " +
+              functionName + ":" + function.getClassName() + ". Ignore and continue.");
+        }
+      }
+    }
+  }
+
   public static Hive get(Configuration c, Class<?> clazz) throws HiveException {
     return get(c instanceof HiveConf ? (HiveConf)c : new HiveConf(c, clazz));
   }
@@ -1307,6 +1334,7 @@ public class Hive {
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
    *          If the source directory is LOCAL
+   * @param isAcid true if this is an ACID operation
    */
   public Partition loadPartition(Path loadPath, Table tbl,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
@@ -1354,16 +1382,19 @@ public class Hive {
         newPartPath = oldPartPath;
       }
 
+      List<Path> newFiles = null;
       if (replace) {
         Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
             isSrcLocal);
       } else {
+        newFiles = new ArrayList<Path>();
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
+        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles);
       }
 
       boolean forceCreate = (!holdDDLTime) ? true : false;
-      newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
+      newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(),
+          inheritTableSpecs, newFiles);
       // recreate the partition if it existed before
       if (!holdDDLTime) {
         if (isSkewedStoreAsSubdir) {
@@ -1376,7 +1407,8 @@ public class Hive {
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
           alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
-          newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
+          newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs,
+              newFiles);
           return new Partition(tbl, newCreatedTpart);
         }
       }
@@ -1486,6 +1518,8 @@ private void constructOneLBLocationMap(F
    * @param replace
    * @param numDP number of dynamic partitions
    * @param holdDDLTime
+   * @param listBucketingEnabled
+   * @param isAcid true if this is an ACID operation
    * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
@@ -1577,11 +1611,20 @@ private void constructOneLBLocationMap(F
   public void loadTable(Path loadPath, String tableName, boolean replace,
       boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid)
       throws HiveException {
+    List<Path> newFiles = new ArrayList<Path>();
     Table tbl = getTable(tableName);
+    HiveConf sessionConf = SessionState.getSessionConf();
     if (replace) {
-      tbl.replaceFiles(loadPath, isSrcLocal);
+      Path tableDest = tbl.getPath();
+      replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal);
     } else {
-      tbl.copyFiles(loadPath, isSrcLocal, isAcid);
+      FileSystem fs;
+      try {
+        fs = tbl.getDataLocation().getFileSystem(sessionConf);
+        copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles);
+      } catch (IOException e) {
+        throw new HiveException("addFiles: filesystem error in check phase", e);
+      }
       tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true");
     }
 
@@ -1606,6 +1649,7 @@ private void constructOneLBLocationMap(F
         throw new HiveException(e);
       }
     }
+    fireInsertEvent(tbl, null, newFiles);
   }
 
   /**
@@ -1693,7 +1737,7 @@ private void constructOneLBLocationMap(F
 
   public Partition getPartition(Table tbl, Map<String, String> partSpec,
       boolean forceCreate) throws HiveException {
-    return getPartition(tbl, partSpec, forceCreate, null, true);
+    return getPartition(tbl, partSpec, forceCreate, null, true, null);
   }
 
   /**
@@ -1711,8 +1755,32 @@ private void constructOneLBLocationMap(F
    * @return result partition object or null if there is no partition
    * @throws HiveException
    */
+  public Partition getPartition(Table tbl, Map<String, String> partSpec, boolean forceCreate,
+                                String partPath, boolean inheritTableSpecs)
+      throws HiveException {
+    return getPartition(tbl, partSpec, forceCreate, partPath, inheritTableSpecs, null);
+  }
+
+  /**
+   * Returns partition metadata
+   *
+   * @param tbl
+   *          the partition's table
+   * @param partSpec
+   *          partition keys and values
+   * @param forceCreate
+   *          if this is true and partition doesn't exist then a partition is
+   *          created
+   * @param partPath the path where the partition data is located
+   * @param inheritTableSpecs whether to copy over the table specs for if/of/serde
+   * @param newFiles An optional list of new files that were moved into this partition.  If
+   *                 non-null these will be included in the DML event sent to the metastore.
+   * @return result partition object or null if there is no partition
+   * @throws HiveException
+   */
   public Partition getPartition(Table tbl, Map<String, String> partSpec,
-      boolean forceCreate, String partPath, boolean inheritTableSpecs) throws HiveException {
+      boolean forceCreate, String partPath, boolean inheritTableSpecs, List<Path> newFiles)
+      throws HiveException {
     tbl.validatePartColumnNames(partSpec, true);
     List<String> pvals = new ArrayList<String>();
     for (FieldSchema field : tbl.getPartCols()) {
@@ -1772,6 +1840,7 @@ private void constructOneLBLocationMap(F
         }
         else {
           alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath);
+          fireInsertEvent(tbl, partSpec, newFiles);
         }
       }
       if (tpart == null) {
@@ -1813,6 +1882,36 @@ private void constructOneLBLocationMap(F
     alterPartition(fullName, new Partition(tbl, tpart));
   }
 
+  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles)
+      throws HiveException {
+    if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
+      LOG.debug("Firing dml insert event");
+      FireEventRequestData data = new FireEventRequestData();
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      data.setInsertData(insertData);
+      if (newFiles != null && newFiles.size() > 0) {
+        for (Path p : newFiles) {
+          insertData.addToFilesAdded(p.toString());
+        }
+      }
+      FireEventRequest rqst = new FireEventRequest(true, data);
+      rqst.setDbName(tbl.getDbName());
+      rqst.setTableName(tbl.getTableName());
+      if (partitionSpec != null && partitionSpec.size() > 0) {
+        List<String> partVals = new ArrayList<String>(partitionSpec.size());
+        for (FieldSchema fs : tbl.getPartitionKeys()) {
+          partVals.add(partitionSpec.get(fs.getName()));
+        }
+        rqst.setPartitionVals(partVals);
+      }
+      try {
+        getMSC().fireListenerEvent(rqst);
+      } catch (TException e) {
+        throw new HiveException(e);
+      }
+    }
+  }
+
   public boolean dropPartition(String tblName, List<String> part_vals, boolean deleteData)
       throws HiveException {
     String[] names = Utilities.getDbTableName(tblName);
@@ -2502,10 +2601,12 @@ private void constructOneLBLocationMap(F
    * @param fs Filesystem
    * @param isSrcLocal true if source is on local file system
    * @param isAcid true if this is an ACID based write
+   * @param newFiles if this is non-null, a list of files that were created as a result of this
+   *                 move will be returned.
    * @throws HiveException
    */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
-      FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException {
+      FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) throws HiveException {
     boolean inheritPerms = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
@@ -2537,7 +2638,7 @@ private void constructOneLBLocationMap(F
     // If we're moving files around for an ACID write then the rules and paths are all different.
     // You can blame this on Owen.
     if (isAcid) {
-      moveAcidFiles(srcFs, srcs, destf);
+      moveAcidFiles(srcFs, srcs, destf, newFiles);
     } else {
     // check that source and target paths exist
       List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
@@ -2549,6 +2650,7 @@ private void constructOneLBLocationMap(F
               throw new IOException("Cannot move " + sdpair[0] + " to "
                   + sdpair[1]);
             }
+            if (newFiles != null) newFiles.add(sdpair[1]);
           }
         }
       } catch (IOException e) {
@@ -2557,8 +2659,8 @@ private void constructOneLBLocationMap(F
     }
   }
 
-  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst)
-      throws HiveException {
+  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
+                                    List<Path> newFiles) throws HiveException {
     // The layout for ACID files is table|partname/base|delta/bucket
     // We will always only be writing delta files.  In the buckets created by FileSinkOperator
     // it will look like bucket/delta/bucket.  So we need to move that into the above structure.
@@ -2622,6 +2724,7 @@ private void constructOneLBLocationMap(F
               LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
                   bucketDest.toUri().toString());
               fs.rename(bucketSrc, bucketDest);
+              if (newFiles != null) newFiles.add(bucketDest);
             }
           } catch (IOException e) {
             throw new HiveException("Error moving acid files " + e.getMessage(), e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Tue Feb 17 06:49:27 2015
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -246,10 +247,8 @@ public class Partition implements Serial
   final public Deserializer getDeserializer() {
     if (deserializer == null) {
       try {
-        deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(),
+        deserializer = MetaStoreUtils.getDeserializer(SessionState.getSessionConf(),
             tPartition, table.getTTable());
-      } catch (HiveException e) {
-        throw new RuntimeException(e);
       } catch (MetaException e) {
         throw new RuntimeException(e);
       }
@@ -367,7 +366,7 @@ public class Partition implements Serial
     try {
       // Previously, this got the filesystem of the Table, which could be
       // different from the filesystem of the partition.
-      FileSystem fs = getDataLocation().getFileSystem(Hive.get().getConf());
+      FileSystem fs = getDataLocation().getFileSystem(SessionState.getSessionConf());
       String pathPattern = getDataLocation().toString();
       if (getBucketCount() > 0) {
         pathPattern = pathPattern + "/*";
@@ -495,11 +494,11 @@ public class Partition implements Serial
   public List<FieldSchema> getCols() {
 
     try {
-      if (Table.hasMetastoreBasedSchema(Hive.get().getConf(), tPartition.getSd())) {
+      if (Table.hasMetastoreBasedSchema(SessionState.getSessionConf(), tPartition.getSd())) {
         return tPartition.getSd().getCols();
       }
-      return Hive.getFieldsFromDeserializer(table.getTableName(), getDeserializer());
-    } catch (HiveException e) {
+      return MetaStoreUtils.getFieldsFromDeserializer(table.getTableName(), getDeserializer());
+    } catch (Exception e) {
       LOG.error("Unable to get cols from serde: " +
           tPartition.getSd().getSerdeInfo().getSerializationLib(), e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1660293&r1=1660292&r2=1660293&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Tue Feb 17 06:49:27 2015
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,7 +40,6 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.ProtectMode;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -259,7 +257,7 @@ public class Table implements Serializab
   }
 
   final public Class<? extends Deserializer> getDeserializerClass() throws Exception {
-    return MetaStoreUtils.getDeserializerClass(Hive.get().getConf(), tTable);
+    return MetaStoreUtils.getDeserializerClass(SessionState.getSessionConf(), tTable);
   }
 
   final public Deserializer getDeserializer(boolean skipConfError) {
@@ -271,11 +269,9 @@ public class Table implements Serializab
 
   final public Deserializer getDeserializerFromMetaStore(boolean skipConfError) {
     try {
-      return MetaStoreUtils.getDeserializer(Hive.get().getConf(), tTable, skipConfError);
+      return MetaStoreUtils.getDeserializer(SessionState.getSessionConf(), tTable, skipConfError);
     } catch (MetaException e) {
       throw new RuntimeException(e);
-    } catch (HiveException e) {
-      throw new RuntimeException(e);
     }
   }
 
@@ -285,7 +281,7 @@ public class Table implements Serializab
     }
     try {
       storageHandler = HiveUtils.getStorageHandler(
-        Hive.get().getConf(),
+          SessionState.getSessionConf(),
         getProperty(
           org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE));
     } catch (Exception e) {
@@ -589,12 +585,12 @@ public class Table implements Serializab
 
     String serializationLib = getSerializationLib();
     try {
-      if (hasMetastoreBasedSchema(Hive.get().getConf(), serializationLib)) {
+      if (hasMetastoreBasedSchema(SessionState.getSessionConf(), serializationLib)) {
         return tTable.getSd().getCols();
       } else {
-        return Hive.getFieldsFromDeserializer(getTableName(), getDeserializer());
+        return MetaStoreUtils.getFieldsFromDeserializer(getTableName(), getDeserializer());
       }
-    } catch (HiveException e) {
+    } catch (Exception e) {
       LOG.error("Unable to get field from serde: " + serializationLib, e);
     }
     return new ArrayList<FieldSchema>();
@@ -625,42 +621,6 @@ public class Table implements Serializab
     return tTable.getSd().getNumBuckets();
   }
 
-  /**
-   * Replaces the directory corresponding to the table by srcf. Works by
-   * deleting the table directory and renaming the source directory.
-   *
-   * @param srcf
-   *          Source directory
-   * @param isSrcLocal
-   *          If the source directory is LOCAL
-   */
-  protected void replaceFiles(Path srcf, boolean isSrcLocal)
-      throws HiveException {
-    Path tableDest = getPath();
-    Hive.replaceFiles(tableDest, srcf, tableDest, tableDest, Hive.get().getConf(),
-        isSrcLocal);
-  }
-
-  /**
-   * Inserts files specified into the partition. Works by moving files
-   *
-   * @param srcf
-   *          Files to be moved. Leaf directories or globbed file paths
-   * @param isSrcLocal
-   *          If the source directory is LOCAL
-   * @param isAcid
-   *          True if this is an ACID based insert, update, or delete
-   */
-  protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) throws HiveException {
-    FileSystem fs;
-    try {
-      fs = getDataLocation().getFileSystem(Hive.get().getConf());
-      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid);
-    } catch (IOException e) {
-      throw new HiveException("addFiles: filesystem error in check phase", e);
-    }
-  }
-
   public void setInputFormatClass(String name) throws HiveException {
     if (name == null) {
       inputFormatClass = null;
@@ -931,22 +891,12 @@ public class Table implements Serializab
     return dbName + "@" + tabName;
   }
 
-  /**
-   * @return List containing Indexes names if there are indexes on this table
-   * @throws HiveException
-   **/
-  public List<Index> getAllIndexes(short max) throws HiveException {
-    Hive hive = Hive.get();
-    return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max);
-  }
-
   @SuppressWarnings("nls")
   public FileStatus[] getSortedPaths() {
     try {
       // Previously, this got the filesystem of the Table, which could be
       // different from the filesystem of the partition.
-      FileSystem fs = FileSystem.get(getPath().toUri(), Hive.get()
-          .getConf());
+      FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf());
       String pathPattern = getPath().toString();
       if (getNumBuckets() > 0) {
         pathPattern = pathPattern + "/*";



Mime
View raw message