hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1659147 - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/ s...
Date Thu, 12 Feb 2015 04:53:51 GMT
Author: brock
Date: Thu Feb 12 04:53:51 2015
New Revision: 1659147

URL: http://svn.apache.org/r1659147
Log:
HIVE-9333 - Move parquet serialize implementation to DataWritableWriter to improve write speeds (Sergio via Brock)

Added:
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Thu Feb 12 04:53:51 2015
@@ -21,7 +21,6 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo
 import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
 import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
 
 /**
  *
  * A Parquet OutputFormat for Hive (with the deprecated package mapred)
  *
  */
-public class MapredParquetOutputFormat extends FileOutputFormat<Void, ArrayWritable> implements
-  HiveOutputFormat<Void, ArrayWritable> {
+public class MapredParquetOutputFormat extends FileOutputFormat<Void, ParquetHiveRecord> implements
+  HiveOutputFormat<Void, ParquetHiveRecord> {
 
   private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class);
 
-  protected ParquetOutputFormat<ArrayWritable> realOutputFormat;
+  protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
 
   public MapredParquetOutputFormat() {
-    realOutputFormat = new ParquetOutputFormat<ArrayWritable>(new DataWritableWriteSupport());
+    realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport());
   }
 
-  public MapredParquetOutputFormat(final OutputFormat<Void, ArrayWritable> mapreduceOutputFormat) {
-    realOutputFormat = (ParquetOutputFormat<ArrayWritable>) mapreduceOutputFormat;
+  public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> mapreduceOutputFormat) {
+    realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mapreduceOutputFormat;
   }
 
   @Override
@@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e
   }
 
   @Override
-  public RecordWriter<Void, ArrayWritable> getRecordWriter(
+  public RecordWriter<Void, ParquetHiveRecord> getRecordWriter(
       final FileSystem ignored,
       final JobConf job,
       final String name,
@@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e
   }
 
   protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-      ParquetOutputFormat<ArrayWritable> realOutputFormat,
+      ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
       JobConf jobConf,
       String finalOutPath,
       Progressable progress,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Thu Feb 12 04:53:51 2015
@@ -13,61 +13,31 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.serde;
 
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.ParquetWriter;
-import parquet.io.api.Binary;
 
 /**
  *
@@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab
   private long deserializedSize;
   private String compressionType;
 
+  private ParquetHiveRecord parquetRow;
+
+  public ParquetHiveSerDe() {
+    parquetRow = new ParquetHiveRecord();
+    stats = new SerDeStats();
+  }
+
   @Override
   public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException {
 
@@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab
     this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
 
     // Stats part
-    stats = new SerDeStats();
     serializedSize = 0;
     deserializedSize = 0;
     status = LAST_OPERATION.UNKNOWN;
@@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return ArrayWritable.class;
+    return ParquetHiveRecord.class;
   }
 
   @Override
@@ -178,154 +154,11 @@ public class ParquetHiveSerDe extends Ab
     if (!objInspector.getCategory().equals(Category.STRUCT)) {
       throw new SerDeException("Cannot serialize " + objInspector.getCategory() + ". Can only serialize a struct");
     }
-    final ArrayWritable serializeData = createStruct(obj, (StructObjectInspector) objInspector);
-    serializedSize = serializeData.get().length;
+    serializedSize = ((StructObjectInspector)objInspector).getAllStructFieldRefs().size();
     status = LAST_OPERATION.SERIALIZE;
-    return serializeData;
-  }
-
-  private ArrayWritable createStruct(final Object obj, final StructObjectInspector inspector)
-      throws SerDeException {
-    final List<? extends StructField> fields = inspector.getAllStructFieldRefs();
-    final Writable[] arr = new Writable[fields.size()];
-    for (int i = 0; i < fields.size(); i++) {
-      final StructField field = fields.get(i);
-      final Object subObj = inspector.getStructFieldData(obj, field);
-      final ObjectInspector subInspector = field.getFieldObjectInspector();
-      arr[i] = createObject(subObj, subInspector);
-    }
-    return new ArrayWritable(Writable.class, arr);
-  }
-
-  private Writable createMap(final Object obj, final MapObjectInspector inspector)
-      throws SerDeException {
-    final Map<?, ?> sourceMap = inspector.getMap(obj);
-    final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
-    final ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
-    final List<ArrayWritable> array = new ArrayList<ArrayWritable>();
-
-    if (sourceMap != null) {
-      for (final Entry<?, ?> keyValue : sourceMap.entrySet()) {
-        final Writable key = createObject(keyValue.getKey(), keyInspector);
-        final Writable value = createObject(keyValue.getValue(), valueInspector);
-        if (key != null) {
-          Writable[] arr = new Writable[2];
-          arr[0] = key;
-          arr[1] = value;
-          array.add(new ArrayWritable(Writable.class, arr));
-        }
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class,
-          array.toArray(new ArrayWritable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private ArrayWritable createArray(final Object obj, final ListObjectInspector inspector)
-      throws SerDeException {
-    final List<?> sourceArray = inspector.getList(obj);
-    final ObjectInspector subInspector = inspector.getListElementObjectInspector();
-    final List<Writable> array = new ArrayList<Writable>();
-    if (sourceArray != null) {
-      for (final Object curObj : sourceArray) {
-        array.add(createObject(curObj, subInspector));
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(Writable.class,
-          array.toArray(new Writable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private Writable createPrimitive(final Object obj, final PrimitiveObjectInspector inspector)
-      throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-    switch (inspector.getPrimitiveCategory()) {
-    case VOID:
-      return null;
-    case BOOLEAN:
-      return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) ? Boolean.TRUE : Boolean.FALSE);
-    case BYTE:
-      return new ByteWritable(((ByteObjectInspector) inspector).get(obj));
-    case DOUBLE:
-      return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
-    case FLOAT:
-      return new FloatWritable(((FloatObjectInspector) inspector).get(obj));
-    case INT:
-      return new IntWritable(((IntObjectInspector) inspector).get(obj));
-    case LONG:
-      return new LongWritable(((LongObjectInspector) inspector).get(obj));
-    case SHORT:
-      return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
-    case STRING:
-      String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj);
-      try {
-        return new BytesWritable(v.getBytes("UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        throw new SerDeException("Failed to encode string in UTF-8", e);
-      }
-    case DECIMAL:
-      HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj);
-      DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo();
-      int prec = decTypeInfo.precision();
-      int scale = decTypeInfo.scale();
-      byte[] src = hd.setScale(scale).unscaledValue().toByteArray();
-      // Estimated number of bytes needed.
-      int bytes =  PRECISION_TO_BYTE_COUNT[prec - 1];
-      if (bytes == src.length) {
-        // No padding needed.
-        return new BytesWritable(src);
-      }
-      byte[] tgt = new byte[bytes];
-      if ( hd.signum() == -1) {
-        // For negative number, initializing bits to 1
-        for (int i = 0; i < bytes; i++) {
-          tgt[i] |= 0xFF;
-        }
-      }
-      System.arraycopy(src, 0, tgt, bytes - src.length, src.length); // Padding leading zeroes/ones.
-      return new BytesWritable(tgt);
-    case TIMESTAMP:
-      return new TimestampWritable(((TimestampObjectInspector) inspector).getPrimitiveJavaObject(obj));
-    case CHAR:
-      String strippedValue = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(obj).getStrippedValue();
-      return new BytesWritable(Binary.fromString(strippedValue).getBytes());
-    case VARCHAR:
-      String value = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(obj).getValue();
-      return new BytesWritable(Binary.fromString(value).getBytes());
-    case BINARY:
-      return new BytesWritable(((BinaryObjectInspector) inspector).getPrimitiveJavaObject(obj));
-    default:
-      throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
-    }
-  }
-
-  private Writable createObject(final Object obj, final ObjectInspector inspector) throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-
-    switch (inspector.getCategory()) {
-    case STRUCT:
-      return createStruct(obj, (StructObjectInspector) inspector);
-    case LIST:
-      return createArray(obj, (ListObjectInspector) inspector);
-    case MAP:
-      return createMap(obj, (MapObjectInspector) inspector);
-    case PRIMITIVE:
-      return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
-    default:
-      throw new SerDeException("Unknown data type" + inspector.getCategory());
-    }
+    parquetRow.value = obj;
+    parquetRow.inspector= (StructObjectInspector)objInspector;
+    return parquetRow;
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java Thu Feb 12 04:53:51 2015
@@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 
 import parquet.hadoop.api.WriteSupport;
 import parquet.io.api.RecordConsumer;
@@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser;
  * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
  *
  */
-public class DataWritableWriteSupport extends WriteSupport<ArrayWritable> {
+public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
 
   public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
 
@@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex
   }
 
   @Override
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     writer.write(record);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java Thu Feb 12 04:53:51 2015
@@ -13,37 +13,29 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.write;
 
-import java.sql.Timestamp;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.GroupType;
 import parquet.schema.OriginalType;
 import parquet.schema.Type;
 
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
 /**
  *
- * DataWritableWriter is a writer,
- * that will read an ArrayWritable and give the data to parquet
- * with the expected schema
- * This is a helper class used by DataWritableWriteSupport class.
+ * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet
+ * API with the expected schema. This class is only used through DataWritableWriteSupport class.
  */
 public class DataWritableWriter {
   private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
@@ -57,13 +49,13 @@ public class DataWritableWriter {
 
   /**
    * It writes all record values to the Parquet RecordConsumer.
-   * @param record Contains the record of values that are going to be written
+   * @param record Contains the record that are going to be written.
    */
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     if (record != null) {
       recordConsumer.startMessage();
       try {
-        writeGroupFields(record, schema);
+        writeGroupFields(record.getObject(), record.getObjectInspector(), schema);
       } catch (RuntimeException e) {
         String errorMessage = "Parquet record is malformed: " + e.getMessage();
         LOG.error(errorMessage, e);
@@ -76,19 +68,23 @@ public class DataWritableWriter {
   /**
    * It writes all the fields contained inside a group to the RecordConsumer.
    * @param value The list of values contained in the group.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the group schema.
    */
-  public void writeGroupFields(final ArrayWritable value, final GroupType type) {
+  private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) {
     if (value != null) {
+      List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+      List<Object> fieldValuesList = inspector.getStructFieldsDataAsList(value);
+
       for (int i = 0; i < type.getFieldCount(); i++) {
         Type fieldType = type.getType(i);
         String fieldName = fieldType.getName();
-        Writable fieldValue = value.get()[i];
+        Object fieldValue = fieldValuesList.get(i);
 
-        // Parquet does not write null elements
         if (fieldValue != null) {
+          ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector();
           recordConsumer.startField(fieldName, i);
-          writeValue(fieldValue, fieldType);
+          writeValue(fieldValue, fieldInspector, fieldType);
           recordConsumer.endField(fieldName, i);
         }
       }
@@ -96,68 +92,93 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and writes
+   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls
    * the correct write function.
    * @param value The writable object that contains the value.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the type schema.
    */
-  private void writeValue(final Writable value, final Type type) {
+  private void writeValue(final Object value, final ObjectInspector inspector, final Type type) {
     if (type.isPrimitive()) {
-      writePrimitive(value);
-    } else if (value instanceof ArrayWritable) {
+      checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE);
+      writePrimitive(value, (PrimitiveObjectInspector)inspector);
+    } else {
       GroupType groupType = type.asGroupType();
       OriginalType originalType = type.getOriginalType();
 
       if (originalType != null && originalType.equals(OriginalType.LIST)) {
-        writeArray((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
+        writeArray(value, (ListObjectInspector)inspector, groupType);
       } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
-        writeMap((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
+        writeMap(value, (MapObjectInspector)inspector, groupType);
       } else {
-        writeGroup((ArrayWritable) value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
+        writeGroup(value, (StructObjectInspector)inspector, groupType);
       }
-    } else {
-      throw new RuntimeException("Field value is not an ArrayWritable object: " + type);
+    }
+  }
+
+  /**
+   * Checks that an inspector matches the category indicated as a parameter.
+   * @param inspector The object inspector to check
+   * @param category The category to match
+   * @throws IllegalArgumentException if inspector does not match the category
+   */
+  private void checkInspectorCategory(ObjectInspector inspector, ObjectInspector.Category category) {
+    if (!inspector.getCategory().equals(category)) {
+      throw new IllegalArgumentException("Invalid data type: expected " + category
+          + " type, but found: " + inspector.getCategory());
     }
   }
 
   /**
    * It writes a group type and all its values to the Parquet RecordConsumer.
    * This is used only for optional and required groups.
-   * @param value ArrayWritable object that contains the group values
-   * @param type Type that contains information about the group schema
+   * @param value Object that contains the group values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group schema.
    */
-  private void writeGroup(final ArrayWritable value, final GroupType type) {
+  private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) {
     recordConsumer.startGroup();
-    writeGroupFields(value, type);
+    writeGroupFields(value, inspector, type);
     recordConsumer.endGroup();
   }
 
   /**
-   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
-   * This is called when the original type (MAP) is detected by writeValue()
-   * @param value The list of map values that contains the repeated KEY_PAIR_VALUE group type
-   * @param type Type that contains information about the group schema
+   * It writes a list type and its array elements to the Parquet RecordConsumer.
+   * This is called when the original type (LIST) is detected by writeValue()/
+   * This function assumes the following schema:
+   *    optional group arrayCol (LIST) {
+   *      repeated group array {
+   *        optional TYPE array_element;
+   *      }
+   *    }
+   * @param value The object that contains the array values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (LIST) schema.
    */
-  private void writeMap(final ArrayWritable value, final GroupType type) {
+  private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) {
+    // Get the internal array structure
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)value.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] map_values = repeatedValue.get();
-    for (int record = 0; record < map_values.length; record++) {
-      Writable key_value_pair = map_values[record];
-      if (key_value_pair != null) {
-        // Hive wraps a map key-pair into an ArrayWritable
-        if (key_value_pair instanceof ArrayWritable) {
-          writeGroup((ArrayWritable)key_value_pair, repeatedType);
-        } else {
-          throw new RuntimeException("Map key-value pair is not an ArrayWritable object on record " + record);
-        }
-      } else {
-        throw new RuntimeException("Map key-value pair is null on record " + record);
+    List<?> arrayValues = inspector.getList(value);
+    ObjectInspector elementInspector = inspector.getListElementObjectInspector();
+
+    Type elementType = repeatedType.getType(0);
+    String elementName = elementType.getName();
+
+    for (Object element : arrayValues) {
+      recordConsumer.startGroup();
+      if (element != null) {
+        recordConsumer.startField(elementName, 0);
+        writeValue(element, elementInspector, elementType);
+        recordConsumer.endField(elementName, 0);
       }
+      recordConsumer.endGroup();
     }
 
     recordConsumer.endField(repeatedType.getName(), 0);
@@ -165,35 +186,53 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes a list type and its array elements to the Parquet RecordConsumer.
-   * This is called when the original type (LIST) is detected by writeValue()
-   * @param array The list of array values that contains the repeated array group type
-   * @param type Type that contains information about the group schema
+   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
+   * This is called when the original type (MAP) is detected by writeValue().
+   * This function assumes the following schema:
+   *    optional group mapCol (MAP) {
+   *      repeated group map (MAP_KEY_VALUE) {
+   *        required TYPE key;
+   *        optional TYPE value;
+   *      }
+   *    }
+   * @param value The object that contains the map key-values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (MAP) schema.
    */
-  private void writeArray(final ArrayWritable array, final GroupType type) {
+  private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) {
+    // Get the internal map structure (MAP_KEY_VALUE)
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)array.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] array_values = repeatedValue.get();
-    for (int record = 0; record < array_values.length; record++) {
-      recordConsumer.startGroup();
+    Map<?, ?> mapValues = inspector.getMap(value);
 
-      // Null values must be wrapped into startGroup/endGroup
-      Writable element = array_values[record];
-      if (element != null) {
-        for (int i = 0; i < type.getFieldCount(); i++) {
-          Type fieldType = repeatedType.getType(i);
-          String fieldName = fieldType.getName();
+    Type keyType = repeatedType.getType(0);
+    String keyName = keyType.getName();
+    ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
 
-          recordConsumer.startField(fieldName, i);
-          writeValue(element, fieldType);
-          recordConsumer.endField(fieldName, i);
+    Type valuetype = repeatedType.getType(1);
+    String valueName = valuetype.getName();
+    ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
+
+    for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+      recordConsumer.startGroup();
+      if (keyValue != null) {
+        // write key element
+        Object keyElement = keyValue.getKey();
+        recordConsumer.startField(keyName, 0);
+        writeValue(keyElement, keyInspector, keyType);
+        recordConsumer.endField(keyName, 0);
+
+        // write value element
+        Object valueElement = keyValue.getValue();
+        if (valueElement != null) {
+          recordConsumer.startField(valueName, 1);
+          writeValue(valueElement, valueInspector, valuetype);
+          recordConsumer.endField(valueName, 1);
         }
       }
-
       recordConsumer.endGroup();
     }
 
@@ -203,36 +242,89 @@ public class DataWritableWriter {
 
   /**
    * It writes the primitive value to the Parquet RecordConsumer.
-   * @param value The writable object that contains the primitive value.
+   * @param value The object that contains the primitive value.
+   * @param inspector The object inspector used to get the correct value type.
    */
-  private void writePrimitive(final Writable value) {
+  private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) {
     if (value == null) {
       return;
     }
-    if (value instanceof DoubleWritable) {
-      recordConsumer.addDouble(((DoubleWritable) value).get());
-    } else if (value instanceof BooleanWritable) {
-      recordConsumer.addBoolean(((BooleanWritable) value).get());
-    } else if (value instanceof FloatWritable) {
-      recordConsumer.addFloat(((FloatWritable) value).get());
-    } else if (value instanceof IntWritable) {
-      recordConsumer.addInteger(((IntWritable) value).get());
-    } else if (value instanceof LongWritable) {
-      recordConsumer.addLong(((LongWritable) value).get());
-    } else if (value instanceof ShortWritable) {
-      recordConsumer.addInteger(((ShortWritable) value).get());
-    } else if (value instanceof ByteWritable) {
-      recordConsumer.addInteger(((ByteWritable) value).get());
-    } else if (value instanceof HiveDecimalWritable) {
-      throw new UnsupportedOperationException("HiveDecimalWritable writing not implemented");
-    } else if (value instanceof BytesWritable) {
-      recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable) value).getBytes())));
-    } else if (value instanceof TimestampWritable) {
-      Timestamp ts = ((TimestampWritable) value).getTimestamp();
-      NanoTime nt = NanoTimeUtils.getNanoTime(ts, false);
-      nt.writeValue(recordConsumer);
-    } else {
-      throw new IllegalArgumentException("Unknown value type: " + value + " " + value.getClass());
+
+    switch (inspector.getPrimitiveCategory()) {
+      case VOID:
+        return;
+      case DOUBLE:
+        recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value));
+        break;
+      case BOOLEAN:
+        recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value));
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value));
+        break;
+      case BYTE:
+        recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value));
+        break;
+      case INT:
+        recordConsumer.addInteger(((IntObjectInspector) inspector).get(value));
+        break;
+      case LONG:
+        recordConsumer.addLong(((LongObjectInspector) inspector).get(value));
+        break;
+      case SHORT:
+        recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value));
+        break;
+      case STRING:
+        String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromString(v));
+        break;
+      case CHAR:
+        String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue();
+        recordConsumer.addBinary(Binary.fromString(vChar));
+        break;
+      case VARCHAR:
+        String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue();
+        recordConsumer.addBinary(Binary.fromString(vVarchar));
+        break;
+      case BINARY:
+        byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+        break;
+      case TIMESTAMP:
+        Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
+        break;
+      case DECIMAL:
+        HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value));
+        DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
+        recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
+    }
+  }
+
+  private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) {
+    int prec = decimalTypeInfo.precision();
+    int scale = decimalTypeInfo.scale();
+    byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+    // Estimated number of bytes needed.
+    int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+    if (precToBytes == decimalBytes.length) {
+      // No padding needed.
+      return Binary.fromByteArray(decimalBytes);
     }
+
+    byte[] tgt = new byte[precToBytes];
+      if (hiveDecimal.signum() == -1) {
+      // For negative number, initializing bits to 1
+      for (int i = 0; i < precToBytes; i++) {
+        tgt[i] |= 0xFF;
+      }
+    }
+
+    System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
+    return Binary.fromByteArray(tgt);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Thu Feb 12 04:53:51 2015
@@ -20,7 +20,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.util.ContextUtil;
 
-public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>,
+public class ParquetRecordWriterWrapper implements RecordWriter<Void, ParquetHiveRecord>,
   org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
 
   public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class);
 
-  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable> realWriter;
+  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ParquetHiveRecord> realWriter;
   private final TaskAttemptContext taskContext;
 
   public ParquetRecordWriterWrapper(
-      final OutputFormat<Void, ArrayWritable> realOutputFormat,
+      final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
       final JobConf jobConf,
       final String name,
       final Progressable progress, Properties tableProperties) throws
@@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper
   }
 
   @Override
-  public void write(final Void key, final ArrayWritable value) throws IOException {
+  public void write(final Void key, final ParquetHiveRecord value) throws IOException {
     try {
       realWriter.write(key, value);
     } catch (final InterruptedException e) {
@@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper
 
   @Override
   public void write(final Writable w) throws IOException {
-    write(null, (ArrayWritable) w);
+    write(null, (ParquetHiveRecord) w);
   }
 
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestDataWritableWriter.java Thu Feb 12 04:53:51 2015
@@ -13,9 +13,27 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -27,6 +45,10 @@ import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
 import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -62,6 +84,10 @@ public class TestDataWritableWriter {
     inOrder.verify(mockRecordConsumer).addInteger(value);
   }
 
+  private void addLong(int value) {
+    inOrder.verify(mockRecordConsumer).addLong(value);
+  }
+
   private void addFloat(float value) {
     inOrder.verify(mockRecordConsumer).addFloat(value);
   }
@@ -88,6 +114,12 @@ public class TestDataWritableWriter {
 
   private Writable createNull() { return null; }
 
+  private ByteWritable createTinyInt(byte value) { return new ByteWritable(value); }
+
+  private ShortWritable createSmallInt(short value) { return new ShortWritable(value); }
+
+  private LongWritable createBigInt(long value) { return new LongWritable(value); }
+
   private IntWritable createInt(int value) {
     return new IntWritable(value);
   }
@@ -116,20 +148,68 @@ public class TestDataWritableWriter {
     return new ArrayWritable(Writable.class, createGroup(values).get());
   }
 
-  private void writeParquetRecord(String schemaStr, ArrayWritable record) {
-    MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
-    DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, schema);
+  private List<String> createHiveColumnsFrom(final String columnNamesStr) {
+    List<String> columnNames;
+    if (columnNamesStr.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNamesStr.split(","));
+    }
+
+    return columnNames;
+  }
+
+  private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) {
+    List<TypeInfo> columnTypes;
+
+    if (columnsTypeStr.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr);
+    }
+
+    return columnTypes;
+  }
+
+  private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) {
+    List<TypeInfo> columnTypeList = createHiveTypeInfoFrom(columnTypes);
+    List<String> columnNameList = createHiveColumnsFrom(columnNames);
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+
+    return new ArrayWritableObjectInspector(rowTypeInfo);
+  }
+
+  private ParquetHiveRecord getParquetWritable(String columnNames, String columnTypes, ArrayWritable record) throws SerDeException {
+    Properties recordProperties = new Properties();
+    recordProperties.setProperty("columns", columnNames);
+    recordProperties.setProperty("columns.types", columnTypes);
+
+    ParquetHiveSerDe serDe = new ParquetHiveSerDe();
+    SerDeUtils.initializeSerDe(serDe, new Configuration(), recordProperties, null);
+
+    return new ParquetHiveRecord(serDe.deserialize(record), getObjectInspector(columnNames, columnTypes));
+  }
+
+  private void writeParquetRecord(String schema, ParquetHiveRecord record) throws SerDeException {
+    MessageType fileSchema = MessageTypeParser.parseMessageType(schema);
+    DataWritableWriter hiveParquetWriter = new DataWritableWriter(mockRecordConsumer, fileSchema);
     hiveParquetWriter.write(record);
   }
 
   @Test
   public void testSimpleType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "int,double,boolean,float,string,tinyint,smallint,bigint";
+    String columnTypes = "int,double,boolean,float,string,tinyint,smallint,bigint";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional int32 int;\n"
         + "  optional double double;\n"
         + "  optional boolean boolean;\n"
         + "  optional float float;\n"
-        + "  optional binary string;\n"
+        + "  optional binary string (UTF8);\n"
+        + "  optional int32 tinyint;\n"
+        + "  optional int32 smallint;\n"
+        + "  optional int64 bigint;\n"
         + "}\n";
 
     ArrayWritable hiveRecord = createGroup(
@@ -137,11 +217,14 @@ public class TestDataWritableWriter {
         createDouble(1.0),
         createBoolean(true),
         createFloat(1.0f),
-        createString("one")
+        createString("one"),
+        createTinyInt((byte)1),
+        createSmallInt((short)1),
+        createBigInt((long)1)
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -160,12 +243,24 @@ public class TestDataWritableWriter {
       startField("string", 4);
         addString("one");
       endField("string", 4);
+      startField("tinyint", 5);
+        addInteger(1);
+      endField("tinyint", 5);
+      startField("smallint", 6);
+        addInteger(1);
+      endField("smallint", 6);
+      startField("bigint", 7);
+        addLong(1);
+      endField("bigint", 7);
     endMessage();
   }
 
   @Test
   public void testStructType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "structCol";
+    String columnTypes = "struct<a:int,b:double,c:boolean>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group structCol {\n"
         + "    optional int32 a;\n"
         + "    optional double b;\n"
@@ -182,7 +277,7 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -204,9 +299,12 @@ public class TestDataWritableWriter {
 
   @Test
   public void testArrayType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "arrayCol";
+    String columnTypes = "array<int>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group arrayCol (LIST) {\n"
-        + "    repeated group bag {\n"
+        + "    repeated group array {\n"
         + "      optional int32 array_element;\n"
         + "    }\n"
         + "  }\n"
@@ -223,13 +321,13 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
       startField("arrayCol", 0);
         startGroup();
-          startField("bag", 0);
+          startField("array", 0);
             startGroup();
               startField("array_element", 0);
                 addInteger(1);
@@ -242,7 +340,7 @@ public class TestDataWritableWriter {
               addInteger(2);
             endField("array_element", 0);
             endGroup();
-          endField("bag", 0);
+          endField("array", 0);
         endGroup();
       endField("arrayCol", 0);
     endMessage();
@@ -250,7 +348,10 @@ public class TestDataWritableWriter {
 
   @Test
   public void testMapType() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "mapCol";
+    String columnTypes = "map<string,int>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group mapCol (MAP) {\n"
         + "    repeated group map (MAP_KEY_VALUE) {\n"
         + "      required binary key;\n"
@@ -279,7 +380,7 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -315,12 +416,15 @@ public class TestDataWritableWriter {
 
   @Test
   public void testArrayOfArrays() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String columnNames = "array_of_arrays";
+    String columnTypes = "array<array<int>>";
+
+    String fileSchema = "message hive_schema {\n"
         + "  optional group array_of_arrays (LIST) {\n"
         + "    repeated group array {\n"
-        + "      required group element (LIST) {\n"
+        + "      optional group array_element (LIST) {\n"
         + "        repeated group array {\n"
-        + "          required int32 element;\n"
+        + "          optional int32 array_element;\n"
         + "        }\n"
         + "      }\n"
         + "    }\n"
@@ -341,7 +445,7 @@ public class TestDataWritableWriter {
     );
 
     // Write record to Parquet format
-    writeParquetRecord(schemaStr, hiveRecord);
+    writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
 
     // Verify record was written correctly to Parquet
     startMessage();
@@ -349,22 +453,22 @@ public class TestDataWritableWriter {
         startGroup();
           startField("array", 0);
             startGroup();
-              startField("element", 0);
+              startField("array_element", 0);
                 startGroup();
                   startField("array", 0);
                     startGroup();
-                      startField("element", 0);
+                      startField("array_element", 0);
                         addInteger(1);
-                      endField("element", 0);
+                      endField("array_element", 0);
                     endGroup();
                     startGroup();
-                      startField("element", 0);
+                      startField("array_element", 0);
                         addInteger(2);
-                      endField("element", 0);
+                      endField("array_element", 0);
                     endGroup();
                   endField("array", 0);
                 endGroup();
-              endField("element", 0);
+              endField("array_element", 0);
             endGroup();
           endField("array", 0);
         endGroup();
@@ -373,124 +477,63 @@ public class TestDataWritableWriter {
   }
 
   @Test
-  public void testGroupFieldIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group a {\n"
-        + "    optional int32 b;\n"
-        + "  }\n"
-        + "}\n";
+  public void testExpectedStructTypeOnRecord() throws Exception {
+    String columnNames = "structCol";
+    String columnTypes = "int";
 
     ArrayWritable hiveRecord = createGroup(
-          createInt(1)
+        createInt(1)
     );
 
+    String fileSchema = "message hive_schema {\n"
+        + "  optional group structCol {\n"
+      + "      optional int32 int;\n"
+      + "    }\n"
+        + "}\n";
+
     try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
+      writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
       fail();
     } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Field value is not an ArrayWritable object: " +
-          "optional group a {\n  optional int32 b;\n}", e.getMessage());
+      assertEquals("Parquet record is malformed: Invalid data type: expected STRUCT type, but found: PRIMITIVE", e.getMessage());
     }
   }
 
   @Test
-  public void testArrayGroupElementIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group array_of_arrays (LIST) {\n"
-        + "    repeated group array {\n"
-        + "      required group element (LIST) {\n"
-        + "        required int32 element;\n"
-        + "      }\n"
-        + "    }\n"
-        + "  }\n"
-        + "}\n";
+  public void testExpectedArrayTypeOnRecord() throws Exception {
+    String columnNames = "arrayCol";
+    String columnTypes = "int";
 
     ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createInt(1)
-            )
-        )
+        createInt(1)
     );
 
-    try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
-      fail();
-    } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Field value is not an ArrayWritable object: " +
-          "required group element (LIST) {\n  required int32 element;\n}", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testMapElementIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group mapCol (MAP) {\n"
-        + "    repeated group map (MAP_KEY_VALUE) {\n"
-        + "      required binary key;\n"
-        + "      optional group value {\n"
-        + "        required int32 value;"
-        + "      }\n"
+    String fileSchema = "message hive_schema {\n"
+        + "  optional group arrayCol (LIST) {\n"
+        + "    repeated group bag {\n"
+        + "      optional int32 array_element;\n"
         + "    }\n"
         + "  }\n"
         + "}\n";
 
-    ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createGroup(
-                    createString("key1"),
-                    createInt(1)
-                )
-            )
-        )
-    );
-
     try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
+      writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
       fail();
     } catch (RuntimeException e) {
-      assertEquals(
-          "Parquet record is malformed: Field value is not an ArrayWritable object: " +
-              "optional group value {\n  required int32 value;\n}", e.getMessage());
+      assertEquals("Parquet record is malformed: Invalid data type: expected LIST type, but found: PRIMITIVE", e.getMessage());
     }
   }
 
   @Test
-  public void testMapKeyValueIsNotArrayWritable() throws Exception {
-    String schemaStr = "message hive_schema {\n"
-        + "  optional group mapCol (MAP) {\n"
-        + "    repeated group map (MAP_KEY_VALUE) {\n"
-        + "      required binary key;\n"
-        + "      optional int32 value;\n"
-        + "    }\n"
-        + "  }\n"
-        + "}\n";
+  public void testExpectedMapTypeOnRecord() throws Exception {
+    String columnNames = "mapCol";
+    String columnTypes = "int";
 
     ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createString("key1"),
-                createInt(1)
-            )
-        )
+        createInt(1)
     );
 
-    try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
-      fail();
-    } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Map key-value pair is not an ArrayWritable object on record 0", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testMapKeyValueIsNull() throws Exception {
-    String schemaStr = "message hive_schema {\n"
+    String fileSchema = "message hive_schema {\n"
         + "  optional group mapCol (MAP) {\n"
         + "    repeated group map (MAP_KEY_VALUE) {\n"
         + "      required binary key;\n"
@@ -499,20 +542,11 @@ public class TestDataWritableWriter {
         + "  }\n"
         + "}\n";
 
-    ArrayWritable hiveRecord = createGroup(
-        createGroup(
-            createArray(
-                createNull()
-            )
-        )
-    );
-
     try {
-      // Write record to Parquet format
-      writeParquetRecord(schemaStr, hiveRecord);
+      writeParquetRecord(fileSchema, getParquetWritable(columnNames, columnTypes, hiveRecord));
       fail();
     } catch (RuntimeException e) {
-      assertEquals("Parquet record is malformed: Map key-value pair is null on record 0", e.getMessage());
+      assertEquals("Parquet record is malformed: Invalid data type: expected MAP type, but found: PRIMITIVE", e.getMessage());
     }
   }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java Thu Feb 12 04:53:51 2015
@@ -24,7 +24,7 @@ import java.util.Properties;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
 import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Test;
@@ -41,7 +41,7 @@ public class TestMapredParquetOutputForm
   @SuppressWarnings("unchecked")
   @Test
   public void testConstructorWithFormat() {
-    new MapredParquetOutputFormat((ParquetOutputFormat<ArrayWritable>) mock(ParquetOutputFormat.class));
+    new MapredParquetOutputFormat((ParquetOutputFormat<ParquetHiveRecord>) mock(ParquetOutputFormat.class));
   }
 
   @Test
@@ -62,7 +62,7 @@ public class TestMapredParquetOutputForm
     tableProps.setProperty("columns.types", "int:int");
 
     final Progressable mockProgress = mock(Progressable.class);
-    final ParquetOutputFormat<ArrayWritable> outputFormat = (ParquetOutputFormat<ArrayWritable>) mock(ParquetOutputFormat.class);
+    final ParquetOutputFormat<ParquetHiveRecord> outputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mock(ParquetOutputFormat.class);
 
     JobConf jobConf = new JobConf();
 
@@ -70,7 +70,7 @@ public class TestMapredParquetOutputForm
       new MapredParquetOutputFormat(outputFormat) {
         @Override
         protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-            ParquetOutputFormat<ArrayWritable> realOutputFormat,
+            ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
             JobConf jobConf,
             String finalOutPath,
             Progressable progress,

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java?rev=1659147&r1=1659146&r2=1659147&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetSerDe.java Thu Feb 12 04:53:51 2015
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.ArrayWritable;
@@ -96,9 +97,9 @@ public class TestParquetSerDe extends Te
     assertEquals("deserialization gives the wrong object", t, row);
 
     // Serialize
-    final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi);
-    assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), serializedArr.get().length);
-    assertTrue("serialized object should be equal to starting object", arrayWritableEquals(t, serializedArr));
+    final ParquetHiveRecord serializedArr = (ParquetHiveRecord) serDe.serialize(row, oi);
+    assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), ((ArrayWritable)serializedArr.getObject()).get().length);
+    assertTrue("serialized object should be equal to starting object", arrayWritableEquals(t, (ArrayWritable)serializedArr.getObject()));
   }
 
   private Properties createProperties() {

Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java?rev=1659147&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/ParquetHiveRecord.java Thu Feb 12 04:53:51 2015
@@ -0,0 +1,57 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.io;
+
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This class wraps the object and object inspector that will be used later
+ * in DataWritableWriter class to get the object values.
+ */
+public class ParquetHiveRecord implements Writable {
+  public Object value;
+  public StructObjectInspector inspector;
+
+  public ParquetHiveRecord() {
+    this(null, null);
+  }
+
+  public ParquetHiveRecord(final Object o, final StructObjectInspector oi) {
+    value = o;
+    inspector = oi;
+  }
+
+  public StructObjectInspector getObjectInspector() {
+    return inspector;
+  }
+
+  public Object getObject() {
+    return value;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    throw new UnsupportedOperationException("Unsupported method call.");
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    throw new UnsupportedOperationException("Unsupported method call.");
+  }
+}



Mime
View raw message