parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [parquet-mr] branch master updated: PARQUET-1407: Avro: Fix binary values returned from dictionary encoding (#552)
Date Mon, 19 Nov 2018 22:08:00 GMT
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 542ab3e  PARQUET-1407: Avro: Fix binary values returned from dictionary encoding
(#552)
542ab3e is described below

commit 542ab3e2b321d5f755f3e9c6b997a458f8cf0f5e
Author: nandorKollar <nandorKollar@users.noreply.github.com>
AuthorDate: Mon Nov 19 23:07:55 2018 +0100

    PARQUET-1407: Avro: Fix binary values returned from dictionary encoding (#552)
    
    * PARQUET-1407: Add test case for PARQUET-1407 to demonstrate the issue
    * PARQUET-1407: Fix binary values from dictionary encoding.
    
    Closes #551.
---
 .../org/apache/parquet/avro/AvroConverters.java    | 11 ++-
 .../org/apache/parquet/avro/TestReadWrite.java     | 84 +++++++++++++---------
 2 files changed, 61 insertions(+), 34 deletions(-)

diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
index 817f074..cc49cc2 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -76,9 +76,13 @@ public class AvroConverters {
       }
     }
 
+    public T prepareDictionaryValue(T value) {
+      return value;
+    }
+
     @Override
     public void addValueFromDictionary(int dictionaryId) {
-      parent.add(dict[dictionaryId]);
+      parent.add(prepareDictionaryValue(dict[dictionaryId]));
     }
   }
 
@@ -220,6 +224,11 @@ public class AvroConverters {
     public ByteBuffer convert(Binary binary) {
       return ByteBuffer.wrap(binary.getBytes());
     }
+
+    @Override
+    public ByteBuffer prepareDictionaryValue(ByteBuffer value) {
+      return value.duplicate();
+    }
   }
 
   static final class FieldStringConverter extends BinaryConverter<String> {
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 2335e36..69a73cb 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.Random;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
@@ -87,10 +89,7 @@ public class TestReadWrite {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("array.avsc").openStream());
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     ParquetWriter<GenericRecord> writer = AvroParquetWriter
         .<GenericRecord>builder(file)
@@ -117,10 +116,7 @@ public class TestReadWrite {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("map.avsc").openStream());
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     ParquetWriter<GenericRecord> writer = AvroParquetWriter
         .<GenericRecord>builder(file)
@@ -147,10 +143,7 @@ public class TestReadWrite {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("map_with_nulls.avsc").openStream());
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     ParquetWriter<GenericRecord> writer = AvroParquetWriter
         .<GenericRecord>builder(file)
@@ -182,10 +175,7 @@ public class TestReadWrite {
     schema.setFields(Lists.newArrayList(
         new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null,
null)));
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     ParquetWriter<GenericRecord> writer = AvroParquetWriter
         .<GenericRecord>builder(file)
@@ -209,10 +199,7 @@ public class TestReadWrite {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("map.avsc").openStream());
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     ParquetWriter<GenericRecord> writer = AvroParquetWriter
         .<GenericRecord>builder(file)
@@ -346,11 +333,8 @@ public class TestReadWrite {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("all.avsc").openStream());
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-    
+    Path file = new Path(createTempFile().getPath());
+
     ParquetWriter<GenericRecord> writer = AvroParquetWriter
         .<GenericRecord>builder(file)
         .withSchema(schema)
@@ -429,10 +413,7 @@ public class TestReadWrite {
 
   @Test
   public void testAllUsingDefaultAvroSchema() throws Exception {
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     // write file using Parquet APIs
     ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String,
Object>>(file,
@@ -654,10 +635,7 @@ public class TestReadWrite {
       Collections.singletonList(new Schema.Field("value",
         Schema.createUnion(Schema.create(Schema.Type.STRING)), null, null)));
 
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
+    Path file = new Path(createTempFile().getPath());
 
     // Parquet writer
     ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(avroSchema)
@@ -678,6 +656,46 @@ public class TestReadWrite {
     assertEquals(str("theValue"), nextRecord.get("value"));
   }
 
+  @Test
+  public void testDuplicatedValuesWithDictionary() throws Exception {
+    Schema schema = SchemaBuilder.record("spark_schema")
+      .fields().optionalBytes("value").endRecord();
+
+    Path file = new Path(createTempFile().getPath());
+
+    String[] records = {"one", "two", "three", "three", "two", "one", "zero"};
+    try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
+      .<GenericData.Record>builder(file)
+      .withSchema(schema)
+      .withConf(testConf)
+      .build()) {
+      for (String record : records) {
+        writer.write(new GenericRecordBuilder(schema)
+          .set("value", record.getBytes()).build());
+      }
+    }
+
+    try (ParquetReader<GenericRecord> reader = AvroParquetReader
+      .<GenericRecord>builder(file)
+      .withConf(testConf).build()) {
+      GenericRecord rec;
+      int i = 0;
+      while ((rec = reader.read()) != null) {
+        ByteBuffer buf = (ByteBuffer) rec.get("value");
+        byte[] bytes = new byte[buf.remaining()];
+        buf.get(bytes);
+        assertEquals(records[i++], new String(bytes));
+      }
+    }
+  }
+
+  private File createTempFile() throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    return tmp;
+  }
+
   /**
    * Return a String or Utf8 depending on whether compatibility is on
    */


Mime
View raw message