parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alexleven...@apache.org
Subject parquet-mr git commit: PARQUET-336: Fix ArrayIndexOutOfBounds in checkDeltaByteArrayProblem
Date Thu, 16 Jul 2015 23:42:37 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master f79c9365d -> 8714dd031


PARQUET-336: Fix ArrayIndexOutOfBounds in checkDeltaByteArrayProblem

Author: Alex Levenson <alexlevenson@twitter.com>
Author: Alex Levenson <alex@isnotinvain.com>

Closes #242 from isnotinvain/patch-1 and squashes the following commits:

ce1f81e [Alex Levenson] Add tests
4688930 [Alex Levenson] Fix ArrayIndexOutOfBounds in checkDeltaByteArrayProblem


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8714dd03
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8714dd03
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8714dd03

Branch: refs/heads/master
Commit: 8714dd031647be34d0d27f461894e7b194f25cd7
Parents: f79c936
Author: Alex Levenson <alexlevenson@twitter.com>
Authored: Thu Jul 16 16:42:38 2015 -0700
Committer: Alex Levenson <alexlevenson@twitter.com>
Committed: Thu Jul 16 16:42:38 2015 -0700

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetRecordReader.java     |  4 +-
 .../hadoop/example/TestInputOutputFormat.java   | 71 +++++++++++++++++++-
 2 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8714dd03/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 0a31f9c..1558fc0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -193,7 +193,9 @@ public class ParquetRecordReader<T> extends RecordReader<Void,
T> {
       }
     }
 
-    checkDeltaByteArrayProblem(footer.getFileMetaData(), configuration, filteredBlocks.get(0));
+    if (!filteredBlocks.isEmpty()) {
+      checkDeltaByteArrayProblem(footer.getFileMetaData(), configuration, filteredBlocks.get(0));
+    }
 
     MessageType fileSchema = footer.getFileMetaData().getSchema();
     internalReader.initialize(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8714dd03/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
index 80c4381..987554e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -28,8 +28,12 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -44,6 +48,8 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.parquet.Strings;
+import org.apache.parquet.filter2.predicate.FilterApi;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -139,8 +145,6 @@ public class TestInputOutputFormat {
       factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
     }
 
-    ;
-
     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Void,
Group>.Context context) throws java.io.IOException, InterruptedException {
       Group group = factory.newGroup()
               .append("line", (int) key.get())
@@ -238,6 +242,69 @@ public class TestInputOutputFormat {
     testReadWrite(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>()
{{ put("parquet.task.side.metadata", "true"); }});
   }
 
+  /**
+   * Uses a filter that drops all records to test handling of tasks (mappers) that need to
do no work at all
+   */
+  @Test
+  public void testReadWriteTaskSideMDAggressiveFilter() throws IOException, ClassNotFoundException,
InterruptedException {
+    Configuration conf = new Configuration();
+
+    // this filter predicate should trigger row group filtering that drops all row-groups
+    ParquetInputFormat.setFilterPredicate(conf, FilterApi.eq(FilterApi.intColumn("line"),
-1000));
+    final String fpString = conf.get(ParquetInputFormat.FILTER_PREDICATE);
+
+    runMapReduceJob(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>()
{{
+      put("parquet.task.side.metadata", "true");
+      put(ParquetInputFormat.FILTER_PREDICATE, fpString);
+    }});
+
+    List<String> lines = Files.readAllLines(new File(outputPath.toString(), "part-m-00000").toPath(),
Charset.forName("UTF-8"));
+    assertTrue(lines.isEmpty());
+  }
+
+  @Test
+  public void testReadWriteFilter() throws IOException, ClassNotFoundException, InterruptedException
{
+    Configuration conf = new Configuration();
+
+    // this filter predicate should keep some records but not all (first 500 characters)
+    // "line" is actually position in the file...
+    ParquetInputFormat.setFilterPredicate(conf, FilterApi.lt(FilterApi.intColumn("line"),
500));
+    final String fpString = conf.get(ParquetInputFormat.FILTER_PREDICATE);
+
+    runMapReduceJob(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>()
{{
+      put("parquet.task.side.metadata", "true");
+      put(ParquetInputFormat.FILTER_PREDICATE, fpString);
+    }});
+
+    List<String> expected = Files.readAllLines(new File(inputPath.toString()).toPath(),
Charset.forName("UTF-8"));
+
+    // grab the lines that contain the first 500 characters (including the rest of the line
past 500 characters)
+    int size = 0;
+    Iterator<String> iter = expected.iterator();
+    while(iter.hasNext()) {
+      String next = iter.next();
+
+      if (size < 500) {
+        size += next.length();
+        continue;
+      }
+
+      iter.remove();
+    }
+
+    // put the output back into it's original format (remove the character counts / tabs)
+    List<String> found = Files.readAllLines(new File(outputPath.toString(), "part-m-00000").toPath(),
Charset.forName("UTF-8"));
+    StringBuilder sbFound = new StringBuilder();
+    for (String line : found) {
+      sbFound.append(line.split("\t", -1)[1]);
+      sbFound.append("\n");
+    }
+
+    sbFound.deleteCharAt(sbFound.length() - 1);
+
+    assertEquals(Strings.join(expected, "\n"), sbFound.toString());
+  }
+
   @Test
   public void testProjection() throws Exception{
     readSchema=partialSchema;


Mime
View raw message