parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject git commit: PARQUET-2: Adding Type Persuasion for Primitive Types
Date Tue, 24 Jun 2014 17:19:48 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 859b6b4b9 -> 9ad5485c3


PARQUET-2: Adding Type Persuasion for Primitive Types

Original from the old repo: https://github.com/Parquet/parquet-mr/pull/410
JIRA: https://issues.apache.org/jira/browse/PARQUET-2

These changes allow primitive types to be requested as different types than what is stored
in the file format using a flag to turn off strict type checking (default is on). Types are
cast to the requested type where possible and will suffer precision loss for casting where
necessary (e.g. requesting a double as an int).

No performance penalty is imposed for using the type defined in the file type.  A flag exists
to

A 6x6 test case is provided to test conversion between the primitive types.

Author: Daniel Weeks <dweeks@netflix.com>

Closes #3 from dcw-netflix/type-persuasion and squashes the following commits:

97f4e9a [Daniel Weeks] Added documentation as suggested by code review
1c3c0c7 [Daniel Weeks] Fixed test with strict checking off
f3cb495 [Daniel Weeks] Added type persuasion for primitive types with a flag to control strict
type checking for conflicting schemas, which is strict by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/9ad5485c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/9ad5485c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/9ad5485c

Branch: refs/heads/master
Commit: 9ad5485c3310a8c51510ea50e24834b6cf98c45c
Parents: 859b6b4
Author: Daniel Weeks <dweeks@netflix.com>
Authored: Tue Jun 24 10:19:27 2014 -0700
Committer: julien <julien@twitter.com>
Committed: Tue Jun 24 10:19:27 2014 -0700

----------------------------------------------------------------------
 .../main/java/parquet/io/ColumnIOFactory.java   |  21 ++-
 .../src/main/java/parquet/schema/GroupType.java |  17 ++-
 .../main/java/parquet/schema/MessageType.java   |   6 +-
 .../main/java/parquet/schema/PrimitiveType.java |   9 +-
 .../src/main/java/parquet/schema/Type.java      |   7 +
 .../hadoop/InternalParquetRecordReader.java     |   6 +-
 .../java/parquet/hadoop/ParquetFileWriter.java  |  32 +++-
 .../java/parquet/hadoop/ParquetInputFormat.java |   7 +-
 .../parquet/pig/convert/TupleConverter.java     | 153 ++++++++++++++++++-
 .../java/parquet/pig/TestParquetLoader.java     |  70 ++++++++-
 10 files changed, 305 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
index 7244b77..0252735 100644
--- a/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
+++ b/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
@@ -43,10 +43,16 @@ public class ColumnIOFactory {
     private final MessageType requestedSchema;
     private int currentRequestedIndex;
     private Type currentRequestedType;
+    private boolean strictTypeChecking;
 
     public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
+      this(validating, requestedSchema, true);
+    }
+    
+    public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean
strictTypeChecking) {
       this.validating = validating;
       this.requestedSchema = requestedSchema;
+      this.strictTypeChecking = strictTypeChecking;
     }
 
     @Override
@@ -86,7 +92,8 @@ public class ColumnIOFactory {
 
     @Override
     public void visit(PrimitiveType primitiveType) {
-      if (!currentRequestedType.isPrimitive() || currentRequestedType.asPrimitiveType().getPrimitiveTypeName()
!= primitiveType.getPrimitiveTypeName()) {
+      if (!currentRequestedType.isPrimitive() || 
+              (this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName()
!= primitiveType.getPrimitiveTypeName())) {
         incompatibleSchema(primitiveType, currentRequestedType);
       }
       PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex,
leaves.size());
@@ -127,7 +134,17 @@ public class ColumnIOFactory {
    * @return the corresponding serializing/deserializing structure
    */
   public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema)
{
-    ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema);
+    return getColumnIO(requestedSchema, fileSchema, true);
+  }
+  
+  /**
+   * @param schema the requestedSchema we want to read/write
+   * @param fileSchema the file schema (when reading it can be different from the requested
schema)
+   * @param strict should file type and requested primitive types match
+   * @return the corresponding serializing/deserializing structure
+   */
+  public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema,
boolean strict) {
+    ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema,
strict);
     fileSchema.accept(visitor);
     return visitor.getColumnIO();
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-column/src/main/java/parquet/schema/GroupType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/schema/GroupType.java b/parquet-column/src/main/java/parquet/schema/GroupType.java
index c4e5ff6..fcc801b 100644
--- a/parquet-column/src/main/java/parquet/schema/GroupType.java
+++ b/parquet-column/src/main/java/parquet/schema/GroupType.java
@@ -293,6 +293,11 @@ public class GroupType extends Type {
 
   @Override
   protected Type union(Type toMerge) {
+    return union(toMerge, true);
+  }
+
+  @Override
+  protected Type union(Type toMerge, boolean strict) {
     if (toMerge.isPrimitive()) {
       throw new IncompatibleSchemaModificationException("can not merge primitive type " +
toMerge + " into group type " + this);
     }
@@ -305,6 +310,16 @@ public class GroupType extends Type {
    * @return the merged list
    */
   List<Type> mergeFields(GroupType toMerge) {
+    return mergeFields(toMerge, true);
+  }
+  
+  /**
+   * produces the list of fields resulting from merging toMerge into the fields of this
+   * @param toMerge the group containing the fields to merge
+   * @param strict should schema primitive types match
+   * @return the merged list
+   */
+  List<Type> mergeFields(GroupType toMerge, boolean strict) {
     List<Type> newFields = new ArrayList<Type>();
     // merge existing fields
     for (Type type : this.getFields()) {
@@ -314,7 +329,7 @@ public class GroupType extends Type {
         if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
           throw new IncompatibleSchemaModificationException("repetition constraint is more
restrictive: can not merge type " + fieldToMerge + " into " + type);
         }
-        merged = type.union(fieldToMerge);
+        merged = type.union(fieldToMerge, strict);
       } else {
         merged = type;
       }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-column/src/main/java/parquet/schema/MessageType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/schema/MessageType.java b/parquet-column/src/main/java/parquet/schema/MessageType.java
index f71e3ce..9c05747 100644
--- a/parquet-column/src/main/java/parquet/schema/MessageType.java
+++ b/parquet-column/src/main/java/parquet/schema/MessageType.java
@@ -137,7 +137,11 @@ public final class MessageType extends GroupType {
   }
 
   public MessageType union(MessageType toMerge) {
-    return new MessageType(this.getName(), mergeFields(toMerge));
+    return union(toMerge, true);
+  }
+  
+  public MessageType union(MessageType toMerge, boolean strict) {
+    return new MessageType(this.getName(), mergeFields(toMerge, strict));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
index 5e1d9b3..3a734a9 100644
--- a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
@@ -276,7 +276,7 @@ public final class PrimitiveType extends Type {
   private final PrimitiveTypeName primitive;
   private final int length;
   private final DecimalMetadata decimalMeta;
-
+  
   /**
    * @param repetition OPTIONAL, REPEATED, REQUIRED
    * @param primitive STRING, INT64, ...
@@ -486,7 +486,12 @@ public final class PrimitiveType extends Type {
 
   @Override
   protected Type union(Type toMerge) {
-    if (!toMerge.isPrimitive() || !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName()))
{
+    return union(toMerge, true);
+  }
+
+  @Override
+  protected Type union(Type toMerge, boolean strict) {
+    if (!toMerge.isPrimitive() || (strict && !primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName())))
{
       throw new IncompatibleSchemaModificationException("can not merge type " + toMerge +
" into " + this);
     }
     Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-column/src/main/java/parquet/schema/Type.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/schema/Type.java b/parquet-column/src/main/java/parquet/schema/Type.java
index a3fe23c..a785394 100644
--- a/parquet-column/src/main/java/parquet/schema/Type.java
+++ b/parquet-column/src/main/java/parquet/schema/Type.java
@@ -195,6 +195,13 @@ abstract public class Type {
    * @return the union result of merging toMerge into this
    */
   protected abstract Type union(Type toMerge);
+  
+  /**
+   * @param toMerge the type to merge into this one
+   * @param strict should schema primitive types match
+   * @return the union result of merging toMerge into this
+   */
+  protected abstract Type union(Type toMerge, boolean strict);
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
index f1c952c..8d99a29 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
@@ -37,6 +37,7 @@ import parquet.schema.Type;
 
 import static java.lang.String.format;
 import static parquet.Log.DEBUG;
+import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
 class InternalParquetRecordReader<T> {
   private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
@@ -57,6 +58,7 @@ class InternalParquetRecordReader<T> {
   private ParquetFileReader reader;
   private parquet.io.RecordReader<T> recordReader;
   private UnboundRecordFilter recordFilter;
+  private boolean strictTypeChecking;
 
   private long totalTimeSpentReadingBytes;
   private long totalTimeSpentProcessingRecords;
@@ -106,7 +108,7 @@ class InternalParquetRecordReader<T> {
       BenchmarkCounter.incrementTime(timeSpentReading);
       LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
-      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema,
strictTypeChecking);
       recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
@@ -142,7 +144,7 @@ class InternalParquetRecordReader<T> {
     this.recordConverter = readSupport.prepareForRead(
         configuration, extraMetadata, fileSchema,
         new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
-
+    this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     reader = new ParquetFileReader(configuration, file, blocks, columns);
     for (BlockMetaData block : blocks) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index f5fe167..ff29179 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -83,7 +83,7 @@ public class ParquetFileWriter {
   private long currentChunkFirstDataPage;
   private long currentChunkDictionaryPageOffset;
   private long currentChunkValueCount;
-
+  
   private Statistics currentStatistics;
 
   /**
@@ -439,11 +439,16 @@ public class ParquetFileWriter {
    * @param footers the list files footers to merge
    * @return the global meta data for all the footers
    */
+  
   static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+    return getGlobalMetaData(footers, true);
+  }
+  
+  static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
     GlobalMetaData fileMetaData = null;
     for (Footer footer : footers) {
       ParquetMetadata currentMetadata = footer.getParquetMetadata();
-      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
+      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
     }
     return fileMetaData;
   }
@@ -457,6 +462,13 @@ public class ParquetFileWriter {
   static GlobalMetaData mergeInto(
       FileMetaData toMerge,
       GlobalMetaData mergedMetadata) {
+    return mergeInto(toMerge, mergedMetadata, true);
+  }
+  
+  static GlobalMetaData mergeInto(
+      FileMetaData toMerge,
+      GlobalMetaData mergedMetadata,
+      boolean strict) {
     MessageType schema = null;
     Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
     Set<String> createdBy = new HashSet<String>();
@@ -467,7 +479,7 @@ public class ParquetFileWriter {
     }
     if ((schema == null && toMerge.getSchema() != null)
         || (schema != null && !schema.equals(toMerge.getSchema()))) {
-      schema = mergeInto(toMerge.getSchema(), schema);
+      schema = mergeInto(toMerge.getSchema(), schema, strict);
     }
     for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
       Set<String> values = newKeyValues.get(entry.getKey());
@@ -491,10 +503,22 @@ public class ParquetFileWriter {
    * @return the resulting schema
    */
   static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+    return mergeInto(toMerge, mergedSchema, true);
+  }
+  
+  /**
+   * will return the result of merging toMerge into mergedSchema
+   * @param toMerge the schema to merge into mergedSchema
+   * @param mergedSchema the schema to append the fields to
+   * @param strict should schema primitive types match
+   * @return the resulting schema
+   */
+  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict)
{
     if (mergedSchema == null) {
       return toMerge;
     }
-    return mergedSchema.union(toMerge);
+    
+    return mergedSchema.union(toMerge, strict);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 3abb38b..ddf67a7 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -77,6 +77,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void,
T> {
    * key to configure the filter
    */
   public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
+  
+  /**
+   * key to configure type checking for conflicting schemas (default: true)
+   */
+  public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
 
   private Class<?> readSupportClass;
   private List<Footer> footers;
@@ -358,7 +363,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void,
T> {
       throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative:
maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
     }
     List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
-    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
+    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING,
true));
     ReadContext readContext = getReadSupport(configuration).init(new InitContext(
         configuration,
         globalMetaData.getKeyValueMetaData(),

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
index 9ab527d..eb51b5f 100644
--- a/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
+++ b/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
@@ -105,7 +105,9 @@ public class TupleConverter extends GroupConverter {
           }
         };
       case DataType.CHARARRAY:
-        return new FieldStringConverter(parent);
+          //If the orignal type isn't a string, we don't want to use the dictionary because
+          //a custom implementation will be needed for each type.  Just default to no dictionary.
+        return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
       case DataType.BYTEARRAY:
         return new FieldByteArrayConverter(parent);
       case DataType.INTEGER:
@@ -205,10 +207,12 @@ public class TupleConverter extends GroupConverter {
 
     private final ParentValueContainer parent;
 
+    private boolean dictionarySupport;
     private String[] dict;
 
-    public FieldStringConverter(ParentValueContainer parent) {
+    public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) {
       this.parent = parent;
+      this.dictionarySupport = dictionarySupport;
     }
 
     @Override
@@ -218,7 +222,7 @@ public class TupleConverter extends GroupConverter {
 
     @Override
     public boolean hasDictionarySupport() {
-      return true;
+      return dictionarySupport;
     }
 
     @Override
@@ -233,6 +237,33 @@ public class TupleConverter extends GroupConverter {
     public void addValueFromDictionary(int dictionaryId) {
       parent.add(dict[dictionaryId]);
     }
+
+    @Override
+    public void addLong(long value) {
+      parent.add(Long.toString(value));
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.add(Integer.toString(value));
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.add(Float.toString(value));
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.add(Double.toString(value));
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      parent.add(Boolean.toString(value));
+    }
+    
+    
   }
 
   /**
@@ -273,6 +304,31 @@ public class TupleConverter extends GroupConverter {
       parent.add(value);
     }
 
+    @Override
+    public void addLong(long value) {
+      parent.add((double)value);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.add((double)value);
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.add((double)value);
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      parent.add(value ? 1.0d : 0.0d);
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.add(Double.parseDouble(value.toStringUsingUTF8()));
+    }
+
   }
 
   /**
@@ -293,6 +349,31 @@ public class TupleConverter extends GroupConverter {
       parent.add(value);
     }
 
+    @Override
+    public void addLong(long value) {
+      parent.add((float)value);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.add((float)value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.add((float)value);
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      parent.add(value ? 1.0f : 0.0f);
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.add(Float.parseFloat(value.toStringUsingUTF8()));
+    }
+
   }
 
   /**
@@ -314,6 +395,31 @@ public class TupleConverter extends GroupConverter {
       parent.add(value);
     }
 
+    @Override
+    public void addInt(int value) {
+      parent.add((long)value); 
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.add((long)value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.add((long)value);
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      parent.add(value ? 1L : 0L);
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.add(Long.parseLong(value.toStringUsingUTF8()));
+    }
+    
   }
 
   /**
@@ -339,6 +445,26 @@ public class TupleConverter extends GroupConverter {
       parent.add(value);
     }
 
+    @Override
+    public void addLong(long value) {
+      parent.add((int)value);
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.add((int)value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.add((int)value);
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.add(Integer.parseInt(value.toStringUsingUTF8()));
+    }
+
   }
 
   /**
@@ -364,6 +490,27 @@ public class TupleConverter extends GroupConverter {
       parent.add(value != 0);
     }
 
+    @Override
+    public void addLong(long value) {
+      parent.add(value!=0);
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.add(value!=0);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.add(value!=0);
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.add(Boolean.parseBoolean(value.toStringUsingUTF8()));
+    }
+
+    
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/9ad5485c/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
index fa09949..e5c08af 100644
--- a/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
+++ b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
@@ -15,28 +15,29 @@
  */
 package parquet.pig;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
-
+import java.util.Properties;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadPushDown.RequiredField;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.PigServer;
-import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataType;
+import static org.apache.pig.data.DataType.*;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.junit.Assert;
-import org.junit.Ignore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertTrue;
+
 import org.junit.Test;
+import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
 public class TestParquetLoader {
   @Test
@@ -195,6 +196,61 @@ public class TestParquetLoader {
   }  
   
   @Test
+  public void testTypePersuasion() throws Exception {
+    Properties p = new Properties();  
+    p.setProperty(STRICT_TYPE_CHECKING, Boolean.FALSE.toString());
+    
+    PigServer pigServer = new PigServer(ExecType.LOCAL, p); 
+    pigServer.setValidateEachStatement(true);
+    String out = "target/out";
+    int rows = 10;
+    Data data = Storage.resetData(pigServer);
+    List<Tuple> list = new ArrayList<Tuple>();
+    for (int i = 0; i < rows; i++) {
+      list.add(Storage.tuple(i, (long)i, (float)i, (double)i, Integer.toString(i), Boolean.TRUE));
+    }
+    data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list );
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+    pigServer.deleteFile(out);
+    pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();");
+    pigServer.executeBatch();
+      
+    List<Tuple> actualList = null;
+     
+    byte [] types = { INTEGER, LONG, FLOAT, DOUBLE, CHARARRAY, BOOLEAN };
+    
+    //Test extracting values using each type.
+    for(int i=0; i<types.length; i++) {
+      String query = "B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+
+        "('i:" + DataType.findTypeName(types[i%types.length])+"," +
+        "  l:" + DataType.findTypeName(types[(i+1)%types.length]) +"," +
+        "  f:" + DataType.findTypeName(types[(i+2)%types.length]) +"," +
+        "  d:" + DataType.findTypeName(types[(i+3)%types.length]) +"," +
+        "  s:" + DataType.findTypeName(types[(i+4)%types.length]) +"," +
+        "  b:" + DataType.findTypeName(types[(i+5)%types.length]) +"');";
+      
+      System.out.println("Query: " + query);
+      pigServer.registerQuery(query);
+      pigServer.registerQuery("STORE B into 'out"+i+"' using mock.Storage();");
+      pigServer.executeBatch();
+
+      actualList = data.get("out" + i);
+
+      assertEquals(rows, actualList.size());
+      for(Tuple t : actualList) {
+          assertTrue(t.getType(0) == types[i%types.length]);
+          assertTrue(t.getType(1) == types[(i+1)%types.length]);
+          assertTrue(t.getType(2) == types[(i+2)%types.length]);
+          assertTrue(t.getType(3) == types[(i+3)%types.length]);
+          assertTrue(t.getType(4) == types[(i+4)%types.length]);
+          assertTrue(t.getType(5) == types[(i+5)%types.length]);
+      }
+    }
+    
+  }
+  
+  @Test
   public void testRead() {
     
   }


Mime
View raw message