parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-241: Fix ParquetInputFormat.getFooters() order
Date Thu, 29 Oct 2015 22:42:55 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 5294c64b3 -> 5a45ae3b1


PARQUET-241: Fix ParquetInputFormat.getFooters() order

ParquetInputFormat.getFooters() should return in the same order as what listStatus() returns

Author: Mingyu Kim <mkim@palantir.com>

Closes #164 from mingyukim/parquet-241 and squashes the following commits:

86fe900 [Mingyu Kim] Address PR comments
b0181e2 [Mingyu Kim] PARQUET-241: ParquetInputFormat.getFooters() should return in the same
order as what listStatus() returns


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/5a45ae3b
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/5a45ae3b
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/5a45ae3b

Branch: refs/heads/master
Commit: 5a45ae3b1deb5117cb9e9a13141eeab1e9ad3d71
Parents: 5294c64
Author: Mingyu Kim <mkim@palantir.com>
Authored: Thu Oct 29 15:42:43 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Thu Oct 29 15:42:43 2015 -0700

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetInputFormat.java      | 43 ++++++++-----
 .../apache/parquet/hadoop/TestInputFormat.java  | 65 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/5a45ae3b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 4848f22..e3536d7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -29,8 +29,10 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configurable;
@@ -389,7 +391,9 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void,
T> {
       return Collections.emptyList();
     }
     Configuration config = ContextUtil.getConfiguration(jobContext);
-    List<Footer> footers = new ArrayList<Footer>(statuses.size());
+    // Use LinkedHashMap to preserve the insertion order and ultimately to return the list
of
+    // footers in the same order as the list of file statuses returned from listStatus()
+    Map<FileStatusWrapper, Footer> footersMap = new LinkedHashMap<FileStatusWrapper,
Footer>();
     Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
     Map<Path, FileStatusWrapper> missingStatusesMap =
             new HashMap<Path, FileStatusWrapper>(missingStatuses.size());
@@ -407,33 +411,42 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void,
T> {
                 + " found for '" + status.getPath() + "'");
       }
       if (cacheEntry != null) {
-        footers.add(cacheEntry.getFooter());
+        footersMap.put(statusWrapper, cacheEntry.getFooter());
       } else {
+        footersMap.put(statusWrapper, null);
         missingStatuses.add(status);
         missingStatusesMap.put(status.getPath(), statusWrapper);
       }
     }
     if (Log.DEBUG) {
-      LOG.debug("found " + footers.size() + " footers in cache and adding up "
+      LOG.debug("found " + footersMap.size() + " footers in cache and adding up "
               + "to " + missingStatuses.size() + " missing footers to the cache");
     }
 
-
-    if (missingStatuses.isEmpty()) {
-      return footers;
+    if (!missingStatuses.isEmpty()) {
+      List<Footer> newFooters = getFooters(config, missingStatuses);
+      for (Footer newFooter : newFooters) {
+        // Use the original file status objects to make sure we store a
+        // conservative (older) modification time (i.e. in case the files and
+        // footers were modified and it's not clear which version of the footers
+        // we have)
+        FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
+        footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
+      }
     }
 
-    List<Footer> newFooters = getFooters(config, missingStatuses);
-    for (Footer newFooter : newFooters) {
-      // Use the original file status objects to make sure we store a
-      // conservative (older) modification time (i.e. in case the files and
-      // footers were modified and it's not clear which version of the footers
-      // we have)
-      FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
-      footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
+    List<Footer> footers = new ArrayList<Footer>(statuses.size());
+    for (Entry<FileStatusWrapper, Footer> footerEntry : footersMap.entrySet()) {
+      Footer footer = footerEntry.getValue();
+
+      if (footer == null) {
+          // Footer was originally missing, so get it from the cache again
+          footers.add(footersCache.getCurrentValue(footerEntry.getKey()).getFooter());
+      } else {
+          footers.add(footer);
+      }
     }
 
-    footers.addAll(newFooters);
     return footers;
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/5a45ae3b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
index 6d89ef2..9fe3008 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.PLAIN;
 import static org.apache.parquet.filter2.predicate.FilterApi.and;
 import static org.apache.parquet.filter2.predicate.FilterApi.eq;
 import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
@@ -49,11 +51,13 @@ import org.apache.hadoop.mapreduce.Job;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.filter.RecordFilter;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
@@ -63,6 +67,7 @@ import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -71,6 +76,8 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
+import com.google.common.io.Files;
+
 public class TestInputFormat {
 
   List<BlockMetaData> blocks;
@@ -378,6 +385,62 @@ public class TestInputFormat {
     shouldSplitStartBe(splits, 0, 50);
   }
 
+  @Test
+  public void testGetFootersReturnsInPredictableOrder() throws IOException {
+    File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+    int numFiles = 10; // create a nontrivial number of files so that it actually tests getFooters()
returns files in the correct order
+
+    String url = "";
+    for (int i = 0; i < numFiles; i++) {
+      File file = new File(tempDir, String.format("part-%05d.parquet", i));
+      createParquetFile(file);
+      if (i > 0) {
+        url += ",";
+      }
+      url += "file:" + file.getAbsolutePath();
+    }
+
+    Job job = new Job();
+    FileInputFormat.setInputPaths(job, url);
+    List<Footer> footers = new ParquetInputFormat<Object>().getFooters(job);
+    for (int i = 0; i < numFiles; i++) {
+      Footer footer = footers.get(i);
+      File file = new File(tempDir, String.format("part-%05d.parquet", i));
+      assertEquals("file:" + file.getAbsolutePath(), footer.getFile().toString());
+    }
+  }
+
+  private void createParquetFile(File file) throws IOException {
+    Path path = new Path(file.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema = MessageTypeParser.parseMessageType("message m { required group a
{required binary b;}}");
+    String[] columnPath = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(columnPath);
+
+    byte[] bytes1 = { 0, 1, 2, 3};
+    byte[] bytes2 = { 2, 3, 4, 5};
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+    BinaryStatistics stats = new BinaryStatistics();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(c1, 7, codec);
+    w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+  }
+
   private File getTempFile() throws IOException {
     File tempFile = File.createTempFile("footer_", ".txt");
     tempFile.deleteOnExit();


Mime
View raw message