drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject drill git commit: DRILL-4053: Reduce metadata cache file size. Save a merged schema instead of repeating for every row group. Save maxValue in a row group iff minVal equals maxVal and null otherwise. Maintain backward compatibility with older version. Th
Date Wed, 02 Dec 2015 03:46:07 GMT
Repository: drill
Updated Branches:
  refs/heads/master 9cb553dfe -> 0a3613e7e


DRILL-4053: Reduce metadata cache file size. Save a merged schema instead of
repeating for every row group. Save maxValue in a row group iff minVal
equals maxVal and null otherwise. Maintain backward compatibility
with older version. This closes #254


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0a3613e7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0a3613e7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0a3613e7

Branch: refs/heads/master
Commit: 0a3613e7ef523905e82ffbc113d28aaa535552fa
Parents: 9cb553d
Author: Parth Chandra <parthc@apache.org>
Authored: Wed Oct 28 18:24:16 2015 -0700
Committer: Parth Chandra <parthc@apache.org>
Committed: Tue Dec 1 19:44:51 2015 -0800

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   5 +
 .../drill/exec/store/parquet/Metadata.java      | 680 ++++++++++++++++---
 .../store/parquet/ParquetFileSelection.java     |  12 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   8 +-
 .../exec/store/parquet/ParquetGroupScan.java    | 431 ++++++------
 5 files changed, 831 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0a3613e7/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index ada66eb..90f0f3e 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -119,6 +119,11 @@
       <version>2.4.3</version>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.module</groupId>
+      <artifactId>jackson-module-afterburner</artifactId>
+      <version>2.4.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.glassfish.jersey.ext</groupId>
       <artifactId>jersey-mvc-freemarker</artifactId>
       <version>2.8</version>

http://git-wip-us.apache.org/repos/asf/drill/blob/0a3613e7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index aa2628f..950993a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,20 +18,28 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonGenerator.Feature;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.SchemaPath.De;
 import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.hadoop.fs.BlockLocation;
@@ -40,20 +48,23 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+import org.codehaus.jackson.annotate.JsonIgnore;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -61,12 +72,14 @@ import java.util.concurrent.TimeUnit;
 public class Metadata {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
 
+  public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
 
   private final FileSystem fs;
 
   /**
    * Create the parquet metadata file for the directory at the given path, and for any subdirectories
+   *
    * @param fs
    * @param path
    * @throws IOException
@@ -78,37 +91,41 @@ public class Metadata {
 
   /**
    * Get the parquet metadata for the parquet files in the given directory, including those in subdirectories
+   *
    * @param fs
    * @param path
    * @return
    * @throws IOException
    */
-  public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, String path) throws IOException {
+  public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, String path)
+      throws IOException {
     Metadata metadata = new Metadata(fs);
     return metadata.getParquetTableMetadata(path);
   }
 
   /**
    * Get the parquet metadata for a list of parquet files
+   *
    * @param fs
    * @param fileStatuses
    * @return
    * @throws IOException
    */
-  public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs,
-                                                             List<FileStatus> fileStatuses) throws IOException {
+  public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs,
+      List<FileStatus> fileStatuses) throws IOException {
     Metadata metadata = new Metadata(fs);
     return metadata.getParquetTableMetadata(fileStatuses);
   }
 
   /**
    * Get the parquet metadata for a directory by reading the metadata file
+   *
    * @param fs
    * @param path The path to the metadata file, located in the directory that contains the parquet files
    * @return
    * @throws IOException
    */
-  public static ParquetTableMetadata_v1 readBlockMeta(FileSystem fs, String path) throws IOException {
+  public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path) throws IOException {
     Metadata metadata = new Metadata(fs);
     return metadata.readBlockMeta(path);
   }
@@ -119,12 +136,15 @@ public class Metadata {
 
   /**
    * Create the parquet metadata file for the directory at the given path, and for any subdirectories
+   *
    * @param path
    * @throws IOException
    */
-  private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) throws IOException {
-    List<ParquetFileMetadata> metaDataList = Lists.newArrayList();
+  private ParquetTableMetadata_v2 createMetaFilesRecursively(final String path) throws IOException {
+    List<ParquetFileMetadata_v2> metaDataList = Lists.newArrayList();
     List<String> directoryList = Lists.newArrayList();
+    ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfoSet =
+        new ConcurrentHashMap<>();
     Path p = new Path(path);
     FileStatus fileStatus = fs.getFileStatus(p);
     assert fileStatus.isDirectory() : "Expected directory";
@@ -133,29 +153,49 @@ public class Metadata {
 
     for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
       if (file.isDirectory()) {
-        ParquetTableMetadata_v1 subTableMetadata = createMetaFilesRecursively(file.getPath().toString());
+        ParquetTableMetadata_v2 subTableMetadata = createMetaFilesRecursively(file.getPath().toString());
         metaDataList.addAll(subTableMetadata.files);
         directoryList.addAll(subTableMetadata.directories);
         directoryList.add(file.getPath().toString());
+        // Merge the schema from the child level into the current level
+        //TODO: We need a merge method that merges two colums with the same name but different types
+        columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo);
       } else {
         childFiles.add(file);
       }
     }
+    ParquetTableMetadata_v2 parquetTableMetadata = new ParquetTableMetadata_v2();
     if (childFiles.size() > 0) {
-      metaDataList.addAll(getParquetFileMetadata(childFiles));
+      List<ParquetFileMetadata_v2> childFilesMetadata =
+          getParquetFileMetadata_v2(parquetTableMetadata, childFiles);
+      metaDataList.addAll(childFilesMetadata);
+      // Note that we do not need to merge the columnInfo at this point. The columnInfo is already added
+      // to the parquetTableMetadata.
+    }
+
+    parquetTableMetadata.directories = directoryList;
+    parquetTableMetadata.files = metaDataList;
+    //TODO: We need a merge method that merges two colums with the same name but different types
+    if (parquetTableMetadata.columnTypeInfo == null) {
+      parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
+    }
+    parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet);
+
+    for (String oldname : OLD_METADATA_FILENAMES) {
+      fs.delete(new Path(p, oldname), false);
     }
-    ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(metaDataList, directoryList);
     writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));
     return parquetTableMetadata;
   }
 
   /**
    * Get the parquet metadata for the parquet files in a directory
+   *
    * @param path the path of the directory
    * @return
    * @throws IOException
    */
-  private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOException {
+  private ParquetTableMetadata_v2 getParquetTableMetadata(String path) throws IOException {
     Path p = new Path(path);
     FileStatus fileStatus = fs.getFileStatus(p);
     Stopwatch watch = new Stopwatch();
@@ -164,41 +204,49 @@ public class Metadata {
     logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
     watch.reset();
     watch.start();
-    ParquetTableMetadata_v1 metadata_v1 = getParquetTableMetadata(fileStatuses);
+    ParquetTableMetadata_v2 metadata_v1 = getParquetTableMetadata(fileStatuses);
     logger.info("Took {} ms to read file metadata", watch.elapsed(TimeUnit.MILLISECONDS));
     return metadata_v1;
   }
 
   /**
    * Get the parquet metadata for a list of parquet files
+   *
    * @param fileStatuses
    * @return
    * @throws IOException
    */
-  private ParquetTableMetadata_v1 getParquetTableMetadata(List<FileStatus> fileStatuses) throws IOException {
-    List<ParquetFileMetadata> fileMetadataList = getParquetFileMetadata(fileStatuses);
-    return new ParquetTableMetadata_v1(fileMetadataList, new ArrayList<String>());
+  private ParquetTableMetadata_v2 getParquetTableMetadata(List<FileStatus> fileStatuses)
+      throws IOException {
+    ParquetTableMetadata_v2 tableMetadata = new ParquetTableMetadata_v2();
+    List<ParquetFileMetadata_v2> fileMetadataList = getParquetFileMetadata_v2(tableMetadata, fileStatuses);
+    tableMetadata.files = fileMetadataList;
+    tableMetadata.directories = new ArrayList<String>();
+    return tableMetadata;
   }
 
   /**
    * Get a list of file metadata for a list of parquet files
+   *
    * @param fileStatuses
    * @return
    * @throws IOException
    */
-  private List<ParquetFileMetadata> getParquetFileMetadata(List<FileStatus> fileStatuses) throws IOException {
-    List<TimedRunnable<ParquetFileMetadata>> gatherers = Lists.newArrayList();
+  private List<ParquetFileMetadata_v2> getParquetFileMetadata_v2(
+      ParquetTableMetadata_v2 parquetTableMetadata_v1, List<FileStatus> fileStatuses) throws IOException {
+    List<TimedRunnable<ParquetFileMetadata_v2>> gatherers = Lists.newArrayList();
     for (FileStatus file : fileStatuses) {
-      gatherers.add(new MetadataGatherer(file));
+      gatherers.add(new MetadataGatherer(parquetTableMetadata_v1, file));
     }
 
-    List<ParquetFileMetadata> metaDataList = Lists.newArrayList();
+    List<ParquetFileMetadata_v2> metaDataList = Lists.newArrayList();
     metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16));
     return metaDataList;
   }
 
   /**
    * Recursively get a list of files
+   *
    * @param fileStatus
    * @return
    * @throws IOException
@@ -218,17 +266,19 @@ public class Metadata {
   /**
    * TimedRunnable that reads the footer from parquet and collects file metadata
    */
-  private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata> {
+  private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v2> {
 
     private FileStatus fileStatus;
+    private ParquetTableMetadata_v2 parquetTableMetadata;
 
-    public MetadataGatherer(FileStatus fileStatus) {
+    public MetadataGatherer(ParquetTableMetadata_v2 parquetTableMetadata, FileStatus fileStatus) {
       this.fileStatus = fileStatus;
+      this.parquetTableMetadata = parquetTableMetadata;
     }
 
     @Override
-    protected ParquetFileMetadata runInner() throws Exception {
-      return getParquetFileMetadata(fileStatus);
+    protected ParquetFileMetadata_v2 runInner() throws Exception {
+      return getParquetFileMetadata_v2(parquetTableMetadata, fileStatus);
     }
 
     @Override
@@ -251,64 +301,83 @@ public class Metadata {
 
   /**
    * Get the metadata for a single file
+   *
    * @param file
    * @return
    * @throws IOException
    */
-  private ParquetFileMetadata getParquetFileMetadata(FileStatus file) throws IOException {
+  private ParquetFileMetadata_v2 getParquetFileMetadata_v2(ParquetTableMetadata_v2 parquetTableMetadata,
+      FileStatus file) throws IOException {
     ParquetMetadata metadata = ParquetFileReader.readFooter(fs.getConf(), file);
     MessageType schema = metadata.getFileMetaData().getSchema();
 
-    Map<SchemaPath,OriginalType> originalTypeMap = Maps.newHashMap();
+    Map<SchemaPath, OriginalType> originalTypeMap = Maps.newHashMap();
     schema.getPaths();
     for (String[] path : schema.getPaths()) {
       originalTypeMap.put(SchemaPath.getCompoundPath(path), getOriginalType(schema, path, 0));
     }
 
-    List<RowGroupMetadata> rowGroupMetadataList = Lists.newArrayList();
+    List<RowGroupMetadata_v2> rowGroupMetadataList = Lists.newArrayList();
 
     for (BlockMetaData rowGroup : metadata.getBlocks()) {
-      List<ColumnMetadata> columnMetadataList = Lists.newArrayList();
+      List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList();
       long length = 0;
       for (ColumnChunkMetaData col : rowGroup.getColumns()) {
-        ColumnMetadata columnMetadata;
+        ColumnMetadata_v2 columnMetadata;
 
         boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty());
 
         Statistics stats = col.getStatistics();
-        SchemaPath columnName = SchemaPath.getCompoundPath(col.getPath().toArray());
+        String[] columnName = col.getPath().toArray();
+        SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
+        ColumnTypeMetadata_v2 columnTypeMetadata =
+            new ColumnTypeMetadata_v2(columnName, col.getType(), originalTypeMap.get(columnSchemaName));
+        if (parquetTableMetadata.columnTypeInfo == null) {
+          parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
+        }
+        // Save the column schema info. We'll merge it into one list
+        parquetTableMetadata.columnTypeInfo
+            .put(new ColumnTypeMetadata_v2.Key(columnTypeMetadata.name), columnTypeMetadata);
         if (statsAvailable) {
-          columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName),
-              stats.genericGetMax(), stats.genericGetMin(), stats.getNumNulls());
+          // Write stats only if minVal==maxVal. Also, we then store only maxVal
+          Object mxValue = null;
+          if (stats.genericGetMax() != null && stats.genericGetMin() != null && stats.genericGetMax()
+              .equals(stats.genericGetMin())) {
+            mxValue = stats.genericGetMax();
+          }
+          columnMetadata =
+              new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls());
         } else {
-          columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName),
-              null, null, null);
+          columnMetadata = new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), null, null);
         }
         columnMetadataList.add(columnMetadata);
         length += col.getTotalSize();
       }
 
-      RowGroupMetadata rowGroupMeta = new RowGroupMetadata(rowGroup.getStartingPos(), length, rowGroup.getRowCount(),
+      RowGroupMetadata_v2 rowGroupMeta =
+          new RowGroupMetadata_v2(rowGroup.getStartingPos(), length, rowGroup.getRowCount(),
               getHostAffinity(file, rowGroup.getStartingPos(), length), columnMetadataList);
 
       rowGroupMetadataList.add(rowGroupMeta);
     }
     String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString();
 
-    return new ParquetFileMetadata(path, file.getLen(), rowGroupMetadataList);
+    return new ParquetFileMetadata_v2(path, file.getLen(), rowGroupMetadataList);
   }
 
   /**
    * Get the host affinity for a row group
+   *
    * @param fileStatus the parquet file
-   * @param start the start of the row group
-   * @param length the length of the row group
+   * @param start      the start of the row group
+   * @param length     the length of the row group
    * @return
    * @throws IOException
    */
-  private Map<String,Float> getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException {
+  private Map<String, Float> getHostAffinity(FileStatus fileStatus, long start, long length)
+      throws IOException {
     BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length);
-    Map<String,Float> hostAffinityMap = Maps.newHashMap();
+    Map<String, Float> hostAffinityMap = Maps.newHashMap();
     for (BlockLocation blockLocation : blockLocations) {
       for (String host : blockLocation.getHosts()) {
         Float currentAffinity = hostAffinityMap.get(host);
@@ -316,7 +385,7 @@ public class Metadata {
         float blockEnd = blockStart + blockLocation.getLength();
         float rowGroupEnd = start + length;
         Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) -
-                (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length;
+            (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length;
         if (currentAffinity != null) {
           hostAffinityMap.put(host, currentAffinity + newAffinity);
         } else {
@@ -329,15 +398,19 @@ public class Metadata {
 
   /**
    * Serialize parquet metadata to json and write to a file
+   *
    * @param parquetTableMetadata
    * @param p
    * @throws IOException
    */
-  private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) throws IOException {
+  private void writeFile(ParquetTableMetadata_v2 parquetTableMetadata, Path p) throws IOException {
     JsonFactory jsonFactory = new JsonFactory();
     jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
     jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
     ObjectMapper mapper = new ObjectMapper(jsonFactory);
+    SimpleModule module = new SimpleModule();
+    module.addSerializer(ColumnMetadata_v2.class, new ColumnMetadata_v2.Serializer());
+    mapper.registerModule(module);
     FSDataOutputStream os = fs.create(p);
     mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata);
     os.flush();
@@ -346,25 +419,29 @@ public class Metadata {
 
   /**
    * Read the parquet metadata from a file
+   *
    * @param path
    * @return
    * @throws IOException
    */
-  private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException {
+  private ParquetTableMetadataBase readBlockMeta(String path) throws IOException {
     Stopwatch timer = new Stopwatch();
     timer.start();
     Path p = new Path(path);
     ObjectMapper mapper = new ObjectMapper();
-    SimpleModule module = new SimpleModule();
-    module.addDeserializer(SchemaPath.class, new De());
+    AfterburnerModule module = new AfterburnerModule();
+    module.addDeserializer(SchemaPath.class, new SchemaPath.De());
+    module.addKeyDeserializer(ColumnTypeMetadata_v2.Key.class, new ColumnTypeMetadata_v2.Key.DeSerializer());
     mapper.registerModule(module);
     mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     FSDataInputStream is = fs.open(p);
-    ParquetTableMetadata_v1 parquetTableMetadata = mapper.readValue(is, ParquetTableMetadata_v1.class);
+
+    ParquetTableMetadataBase parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
     logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
     timer.stop();
     if (tableModified(parquetTableMetadata, p)) {
-      parquetTableMetadata = createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString());
+      parquetTableMetadata =
+          createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString());
     }
     return parquetTableMetadata;
   }
@@ -372,18 +449,20 @@ public class Metadata {
   /**
    * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
    * the modification time of the metadata file
+   *
    * @param tableMetadata
    * @param metaFilePath
    * @return
    * @throws IOException
    */
-  private boolean tableModified(ParquetTableMetadata_v1 tableMetadata, Path metaFilePath) throws IOException {
+  private boolean tableModified(ParquetTableMetadataBase tableMetadata, Path metaFilePath)
+      throws IOException {
     long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
     FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
     if (directoryStatus.getModificationTime() > metaFileModifyTime) {
       return true;
     }
-    for (String directory : tableMetadata.directories) {
+    for (String directory : tableMetadata.getDirectories()) {
       directoryStatus = fs.getFileStatus(new Path(directory));
       if (directoryStatus.getModificationTime() > metaFileModifyTime) {
         return true;
@@ -393,46 +472,120 @@ public class Metadata {
   }
 
   @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version")
-  public static class ParquetTableMetadataBase {
+  @JsonSubTypes({
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v1.class, name="v1"),
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v2.class, name="v2")
+      })
+  public static abstract class ParquetTableMetadataBase {
+
+    @JsonIgnore public abstract List<String> getDirectories();
+
+    @JsonIgnore public abstract List<? extends ParquetFileMetadata> getFiles();
+
+    @JsonIgnore public abstract void assignFiles(List<? extends ParquetFileMetadata> newFiles);
+
+    public abstract boolean hasColumnMetadata();
+
+    @JsonIgnore public abstract PrimitiveTypeName getPrimitiveType(String[] columnName);
 
+    @JsonIgnore public abstract OriginalType getOriginalType(String[] columnName);
   }
 
-  /**
-   * Struct which contains the metadata for an entire parquet directory structure
-   */
+  public static abstract class ParquetFileMetadata {
+    @JsonIgnore public abstract String getPath();
+
+    @JsonIgnore public abstract Long getLength();
+
+    @JsonIgnore public abstract List<? extends RowGroupMetadata> getRowGroups();
+  }
+
+
+  public static abstract class RowGroupMetadata {
+    @JsonIgnore public abstract Long getStart();
+
+    @JsonIgnore public abstract Long getLength();
+
+    @JsonIgnore public abstract Long getRowCount();
+
+    @JsonIgnore public abstract Map<String, Float> getHostAffinity();
+
+    @JsonIgnore public abstract List<? extends ColumnMetadata> getColumns();
+  }
+
+
+  public static abstract class ColumnMetadata {
+    public abstract String[] getName();
+
+    public abstract Long getNulls();
+
+    public abstract boolean hasSingleValue();
+
+    public abstract Object getMaxValue();
+
+    public abstract PrimitiveTypeName getPrimitiveType();
+
+    public abstract OriginalType getOriginalType();
+  }
+
+
+
   @JsonTypeName("v1")
   public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase {
-    @JsonProperty
-    List<ParquetFileMetadata> files;
-    @JsonProperty
-    List<String> directories;
+    @JsonProperty List<ParquetFileMetadata_v1> files;
+    @JsonProperty List<String> directories;
 
     public ParquetTableMetadata_v1() {
       super();
     }
 
-    public ParquetTableMetadata_v1(List<ParquetFileMetadata> files, List<String> directories) {
+    public ParquetTableMetadata_v1(ParquetTableMetadataBase p, List<ParquetFileMetadata_v1> files,
+        List<String> directories) {
       this.files = files;
       this.directories = directories;
     }
+
+    @JsonIgnore @Override public List<String> getDirectories() {
+      return directories;
+    }
+
+    @JsonIgnore @Override public List<? extends ParquetFileMetadata> getFiles() {
+      return files;
+    }
+
+    @JsonIgnore @Override public void assignFiles(List<? extends ParquetFileMetadata> newFiles) {
+      this.files = (List<ParquetFileMetadata_v1>) newFiles;
+    }
+
+    @Override public boolean hasColumnMetadata() {
+      return false;
+    }
+
+    @JsonIgnore @Override public PrimitiveTypeName getPrimitiveType(String[] columnName) {
+      return null;
+    }
+
+    @JsonIgnore @Override public OriginalType getOriginalType(String[] columnName) {
+      return null;
+    }
   }
 
+
   /**
    * Struct which contains the metadata for a single parquet file
    */
-  public static class ParquetFileMetadata {
+  public static class ParquetFileMetadata_v1 extends ParquetFileMetadata {
     @JsonProperty
     public String path;
     @JsonProperty
     public Long length;
     @JsonProperty
-    public List<RowGroupMetadata> rowGroups;
+    public List<RowGroupMetadata_v1> rowGroups;
 
-    public ParquetFileMetadata() {
+    public ParquetFileMetadata_v1() {
       super();
     }
 
-    public ParquetFileMetadata(String path, Long length, List<RowGroupMetadata> rowGroups) {
+    public ParquetFileMetadata_v1(String path, Long length, List<RowGroupMetadata_v1> rowGroups) {
       this.path = path;
       this.length = length;
       this.rowGroups = rowGroups;
@@ -442,12 +595,25 @@ public class Metadata {
     public String toString() {
       return String.format("path: %s rowGroups: %s", path, rowGroups);
     }
+
+    @JsonIgnore @Override public String getPath() {
+      return path;
+    }
+
+    @JsonIgnore @Override public Long getLength() {
+      return length;
+    }
+
+    @JsonIgnore @Override public List<? extends RowGroupMetadata> getRowGroups() {
+      return rowGroups;
+    }
   }
 
+
   /**
    * A struct that contains the metadata for a parquet row group
    */
-  public static class RowGroupMetadata {
+  public static class RowGroupMetadata_v1 extends RowGroupMetadata {
     @JsonProperty
     public Long start;
     @JsonProperty
@@ -457,26 +623,47 @@ public class Metadata {
     @JsonProperty
     public Map<String, Float> hostAffinity;
     @JsonProperty
-    public List<ColumnMetadata> columns;
+    public List<ColumnMetadata_v1> columns;
 
-    public RowGroupMetadata() {
+    public RowGroupMetadata_v1() {
       super();
     }
 
-    public RowGroupMetadata(Long start, Long length, Long rowCount,
-                            Map<String, Float> hostAffinity, List<ColumnMetadata> columns) {
+    public RowGroupMetadata_v1(Long start, Long length, Long rowCount, Map<String, Float> hostAffinity,
+        List<ColumnMetadata_v1> columns) {
       this.start = start;
       this.length = length;
       this.rowCount = rowCount;
       this.hostAffinity = hostAffinity;
       this.columns = columns;
     }
+
+    @Override public Long getStart() {
+      return start;
+    }
+
+    @Override public Long getLength() {
+      return length;
+    }
+
+    @Override public Long getRowCount() {
+      return rowCount;
+    }
+
+    @Override public Map<String, Float> getHostAffinity() {
+      return hostAffinity;
+    }
+
+    @Override public List<? extends ColumnMetadata> getColumns() {
+      return columns;
+    }
   }
 
+
   /**
    * A struct that contains the metadata for a column in a parquet file
    */
-  public static class ColumnMetadata {
+  public static class ColumnMetadata_v1 extends ColumnMetadata {
     @JsonProperty
     public SchemaPath name;
     @JsonProperty
@@ -491,12 +678,12 @@ public class Metadata {
     public Object min;
 
 
-    public ColumnMetadata() {
+    public ColumnMetadata_v1() {
       super();
     }
 
-    public ColumnMetadata(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType,
-                          Object max, Object min, Long nulls) {
+    public ColumnMetadata_v1(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType,
+        Object max, Object min, Long nulls) {
       this.name = name;
       this.primitiveType = primitiveType;
       this.originalType = originalType;
@@ -508,7 +695,7 @@ public class Metadata {
     @JsonProperty(value = "min")
     public Object getMin() {
       if (primitiveType == PrimitiveTypeName.BINARY && min != null) {
-         return new String(((Binary) min).getBytes());
+        return new String(((Binary) min).getBytes());
       }
       return min;
     }
@@ -521,17 +708,27 @@ public class Metadata {
       return max;
     }
 
+    @Override public PrimitiveTypeName getPrimitiveType() {
+      return primitiveType;
+    }
+
+    @Override public OriginalType getOriginalType() {
+      return originalType;
+    }
+
     /**
      * setter used during deserialization of the 'min' field of the metadata cache file.
+     *
      * @param min
      */
     @JsonProperty(value = "min")
     public void setMin(Object min) {
       this.min = min;
-     }
+    }
 
     /**
      * setter used during deserialization of the 'max' field of the metadata cache file.
+     *
      * @param max
      */
     @JsonProperty(value = "max")
@@ -539,5 +736,330 @@ public class Metadata {
       this.max = max;
     }
 
+    @Override public String[] getName() {
+      String[] s = new String[1];
+      String nameString = name.toString();
+      // Strip out the surrounding backticks.
+      s[0]=nameString.substring(1, nameString.length()-1);
+      return s;
+    }
+
+    @Override public Long getNulls() {
+      return nulls;
+    }
+
+    @Override public boolean hasSingleValue() {
+      return (max != null && min != null && max.equals(min));
+    }
+
+    @Override public Object getMaxValue() {
+      return max;
+    }
+
+
   }
+
+  /**
+   * Struct which contains the metadata for an entire parquet directory structure
+   */
+  @JsonTypeName("v2") public static class ParquetTableMetadata_v2 extends ParquetTableMetadataBase {
+    /*
+     ColumnTypeInfo is schema information from all the files and row groups, merged into
+     one. To get this info, we pass the ParquetTableMetadata object all the way dow to the
+     RowGroup and the column type is built there as it is read from the footer.
+     */
+    @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo;
+    @JsonProperty List<ParquetFileMetadata_v2> files;
+    @JsonProperty List<String> directories;
+
+    public ParquetTableMetadata_v2() {
+      super();
+    }
+
+    public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
+        List<ParquetFileMetadata_v2> files, List<String> directories) {
+      this.files = files;
+      this.directories = directories;
+      this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
+    }
+
+    public ColumnTypeMetadata_v2 getColumnTypeInfo(String[] name) {
+      return columnTypeInfo.get(new ColumnTypeMetadata_v2.Key(name));
+    }
+
+    @JsonIgnore @Override public List<String> getDirectories() {
+      return directories;
+    }
+
+    @JsonIgnore @Override public List<? extends ParquetFileMetadata> getFiles() {
+      return files;
+    }
+
+    @JsonIgnore @Override public void assignFiles(List<? extends ParquetFileMetadata> newFiles) {
+      this.files = (List<ParquetFileMetadata_v2>) newFiles;
+    }
+
+    @Override public boolean hasColumnMetadata() {
+      return true;
+    }
+
+    @JsonIgnore @Override public PrimitiveTypeName getPrimitiveType(String[] columnName) {
+      return getColumnTypeInfo(columnName).primitiveType;
+    }
+
+    @JsonIgnore @Override public OriginalType getOriginalType(String[] columnName) {
+      return getColumnTypeInfo(columnName).originalType;
+    }
+
+  }
+
+
+  /**
+   * Struct which contains the metadata for a single parquet file
+   */
+  public static class ParquetFileMetadata_v2 extends ParquetFileMetadata {
+    @JsonProperty public String path;
+    @JsonProperty public Long length;
+    @JsonProperty public List<RowGroupMetadata_v2> rowGroups;
+
+    public ParquetFileMetadata_v2() {
+      super();
+    }
+
+    public ParquetFileMetadata_v2(String path, Long length, List<RowGroupMetadata_v2> rowGroups) {
+      this.path = path;
+      this.length = length;
+      this.rowGroups = rowGroups;
+    }
+
+    @Override public String toString() {
+      return String.format("path: %s rowGroups: %s", path, rowGroups);
+    }
+
+    @JsonIgnore @Override public String getPath() {
+      return path;
+    }
+
+    @JsonIgnore @Override public Long getLength() {
+      return length;
+    }
+
+    @JsonIgnore @Override public List<? extends RowGroupMetadata> getRowGroups() {
+      return rowGroups;
+    }
+  }
+
+
+  /**
+   * A struct that contains the metadata for a parquet row group
+   */
+  public static class RowGroupMetadata_v2 extends RowGroupMetadata {
+    @JsonProperty public Long start;
+    @JsonProperty public Long length;
+    @JsonProperty public Long rowCount;
+    @JsonProperty public Map<String, Float> hostAffinity;
+    @JsonProperty public List<ColumnMetadata_v2> columns;
+
+    public RowGroupMetadata_v2() {
+      super();
+    }
+
+    public RowGroupMetadata_v2(Long start, Long length, Long rowCount, Map<String, Float> hostAffinity,
+        List<ColumnMetadata_v2> columns) {
+      this.start = start;
+      this.length = length;
+      this.rowCount = rowCount;
+      this.hostAffinity = hostAffinity;
+      this.columns = columns;
+    }
+
+    @Override public Long getStart() {
+      return start;
+    }
+
+    @Override public Long getLength() {
+      return length;
+    }
+
+    @Override public Long getRowCount() {
+      return rowCount;
+    }
+
+    @Override public Map<String, Float> getHostAffinity() {
+      return hostAffinity;
+    }
+
+    @Override public List<? extends ColumnMetadata> getColumns() {
+      return columns;
+    }
+  }
+
+
+  public static class ColumnTypeMetadata_v2 {
+    @JsonProperty public String[] name;
+    @JsonProperty public PrimitiveTypeName primitiveType;
+    @JsonProperty public OriginalType originalType;
+
+    // Key to find by name only
+    @JsonIgnore private Key key;
+
+    public ColumnTypeMetadata_v2() {
+      super();
+    }
+
+    public ColumnTypeMetadata_v2(String[] name, PrimitiveTypeName primitiveType, OriginalType originalType) {
+      this.name = name;
+      this.primitiveType = primitiveType;
+      this.originalType = originalType;
+      this.key = new Key(name);
+    }
+
+    @JsonIgnore private Key key() {
+      return this.key;
+    }
+
+    private static class Key {
+      private String[] name;
+      private int hashCode = 0;
+
+      public Key(String[] name) {
+        this.name = name;
+      }
+
+      @Override public int hashCode() {
+        if (hashCode == 0) {
+          hashCode = Arrays.hashCode(name);
+        }
+        return hashCode;
+      }
+
+      @Override public boolean equals(Object obj) {
+        if (obj == null) {
+          return false;
+        }
+        if (getClass() != obj.getClass()) {
+          return false;
+        }
+        final Key other = (Key) obj;
+        return Arrays.equals(this.name, other.name);
+      }
+
+      @Override public String toString() {
+        String s = null;
+        for (String namePart : name) {
+          if (s != null) {
+            s += ".";
+            s += namePart;
+          } else {
+            s = namePart;
+          }
+        }
+        return s;
+      }
+
+      public static class DeSerializer extends KeyDeserializer {
+
+        public DeSerializer() {
+          super();
+        }
+
+        @Override
+        public Object deserializeKey(String key, com.fasterxml.jackson.databind.DeserializationContext ctxt)
+            throws IOException, com.fasterxml.jackson.core.JsonProcessingException {
+          return new Key(key.split("\\."));
+        }
+      }
+    }
+  }
+
+
+  /**
+   * A struct that contains the metadata for a column in a parquet file
+   */
+  public static class ColumnMetadata_v2 extends ColumnMetadata {
+    // Use a string array for name instead of Schema Path to make serialization easier
+    @JsonProperty public String[] name;
+    @JsonProperty public Long nulls;
+
+    public Object mxValue;
+
+    @JsonIgnore private PrimitiveTypeName primitiveType;
+
+    public ColumnMetadata_v2() {
+      super();
+    }
+
+    public ColumnMetadata_v2(String[] name, PrimitiveTypeName primitiveType, Object mxValue, Long nulls) {
+      this.name = name;
+      this.mxValue = mxValue;
+      this.nulls = nulls;
+      this.primitiveType = primitiveType;
+    }
+
+    @JsonProperty(value = "mxValue") public void setMax(Object mxValue) {
+      this.mxValue = mxValue;
+    }
+
+    @Override public String[] getName() {
+      return name;
+    }
+
+    @Override public Long getNulls() {
+      return nulls;
+    }
+
+    public boolean hasSingleValue() {
+      return (mxValue != null);
+    }
+
+    @Override public Object getMaxValue() {
+      return mxValue;
+    }
+
+    @Override public PrimitiveTypeName getPrimitiveType() {
+      return null;
+    }
+
+    @Override public OriginalType getOriginalType() {
+      return null;
+    }
+
+    public static class DeSerializer extends JsonDeserializer<ColumnMetadata_v2> {
+      @Override public ColumnMetadata_v2 deserialize(JsonParser jp, DeserializationContext ctxt)
+          throws IOException, JsonProcessingException {
+        return null;
+      }
+    }
+
+
+    // We use a custom serializer and write only non null values.
+    public static class Serializer extends JsonSerializer<ColumnMetadata_v2> {
+      @Override
+      public void serialize(ColumnMetadata_v2 value, JsonGenerator jgen, SerializerProvider provider)
+          throws IOException, JsonProcessingException {
+        jgen.writeStartObject();
+        jgen.writeArrayFieldStart("name");
+        for (String n : value.name) {
+          jgen.writeString(n);
+        }
+        jgen.writeEndArray();
+        if (value.mxValue != null) {
+          Object val;
+          if (value.primitiveType == PrimitiveTypeName.BINARY && value.mxValue != null) {
+            val = new String(((Binary) value.mxValue).getBytes());
+          } else {
+            val = value.mxValue;
+          }
+          jgen.writeObjectField("mxValue", val);
+        }
+        if (value.nulls != null) {
+          jgen.writeObjectField("nulls", value.nulls);
+        }
+        jgen.writeEndObject();
+      }
+    }
+
+  }
+
 }
+

http://git-wip-us.apache.org/repos/asf/drill/blob/0a3613e7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
index 26ebfc5..33dccd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
@@ -19,17 +19,17 @@ package org.apache.drill.exec.store.parquet;
 
 import com.google.common.base.Preconditions;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
 
 /**
- * Parquet specific {@link FileSelection selection} that carries out {@link ParquetTableMetadata_v1 metadata} along.
+ * Parquet specific {@link FileSelection selection} that carries out {@link ParquetTableMetadataBase metadata} along.
  */
 public class ParquetFileSelection extends FileSelection {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetFileSelection.class);
 
-  private final ParquetTableMetadata_v1 metadata;
+  private final ParquetTableMetadataBase metadata;
 
-  protected ParquetFileSelection(final FileSelection delegate, final ParquetTableMetadata_v1 metadata) {
+  protected ParquetFileSelection(final FileSelection delegate, final ParquetTableMetadataBase metadata) {
     super(delegate);
     this.metadata = Preconditions.checkNotNull(metadata, "Parquet metadata cannot be null");
   }
@@ -40,7 +40,7 @@ public class ParquetFileSelection extends FileSelection {
    * It will always be null for non-parquet files and null for cases
    * where no metadata cache was created.
    */
-  public ParquetTableMetadata_v1 getParquetMetadata() {
+  public ParquetTableMetadataBase getParquetMetadata() {
     return metadata;
   }
 
@@ -52,7 +52,7 @@ public class ParquetFileSelection extends FileSelection {
    * @return  null if selection is null
    *          otherwise a new selection
    */
-  public static ParquetFileSelection create(final FileSelection selection, final ParquetTableMetadata_v1 metadata) {
+  public static ParquetFileSelection create(final FileSelection selection, final ParquetTableMetadataBase metadata) {
     if (selection == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0a3613e7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 4932aaf..e2cc670 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -50,8 +50,6 @@ import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MagicString;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
-import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata;
-import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -225,10 +223,10 @@ public class ParquetFormatPlugin implements FormatPlugin{
         Path metaFilePath = getMetadataPath(metaRootDir);
 
         // get the metadata for the directory by reading the metadata file
-        ParquetTableMetadata_v1 metadata  = Metadata.readBlockMeta(fs, metaFilePath.toString());
+        Metadata.ParquetTableMetadataBase metadata  = Metadata.readBlockMeta(fs, metaFilePath.toString());
         List<String> fileNames = Lists.newArrayList();
-        for (ParquetFileMetadata file : metadata.files) {
-          fileNames.add(file.path);
+        for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
+          fileNames.add(file.getPath());
         }
         // when creating the file selection, set the selection root in the form /a/b instead of
         // file:/a/b.  The reason is that the file names above have been created in the form

http://git-wip-us.apache.org/repos/asf/drill/blob/0a3613e7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 3a9fc0d..1677f8b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -58,7 +58,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata;
 import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata;
-import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
 import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
@@ -124,7 +124,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    * from a metadata cache file earlier; we can re-use during
    * the ParquetGroupScan and avoid extra loading time.
    */
-  private ParquetTableMetadata_v1 parquetTableMetadata = null;
+  private Metadata.ParquetTableMetadataBase parquetTableMetadata = null;
 
   /*
    * total number of rows (obtained from parquet footer)
@@ -136,16 +136,15 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    */
   private Map<SchemaPath, Long> columnValueCounts;
 
-  @JsonCreator
-  public ParquetGroupScan( //
+  @JsonCreator public ParquetGroupScan( //
       @JsonProperty("userName") String userName,
-      @JsonProperty("entries") List<ReadEntryWithPath> entries, //
+      @JsonProperty("entries") List<ReadEntryWithPath> entries,//
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JacksonInject StoragePluginRegistry engineRegistry, //
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot //
-      ) throws IOException, ExecutionSetupException {
+  ) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName));
     this.columns = columns;
     if (formatConfig == null) {
@@ -169,7 +168,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       ParquetFormatPlugin formatPlugin, //
       String selectionRoot,
       List<SchemaPath> columns) //
-          throws IOException {
+      throws IOException {
     super(userName);
     this.formatPlugin = formatPlugin;
     this.columns = columns;
@@ -240,20 +239,29 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private Set<String> fileSet;
 
   @JsonIgnore
-  private Map<SchemaPath,MajorType> columnTypeMap = Maps.newHashMap();
+  private Map<SchemaPath, MajorType> columnTypeMap = Maps.newHashMap();
 
   /**
-      * When reading the very first footer, any column is a potential partition column. So for the first footer, we check
-      * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the
-      * remaining footers, we will not find any new partition columns, but we may discover that what was previously a
-      * potential partition column now no longer qualifies, so it needs to be removed from the list.
-      * @return whether column is a potential partition column
-      */
+   * When reading the very first footer, any column is a potential partition column. So for the first footer, we check
+   * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the
+   * remaining footers, we will not find any new partition columns, but we may discover that what was previously a
+   * potential partition column now no longer qualifies, so it needs to be removed from the list.
+   * @return whether column is a potential partition column
+   */
   private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) {
-    SchemaPath schemaPath = columnMetadata.name;
+    SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName());
+    final PrimitiveTypeName primitiveType;
+    final OriginalType originalType;
+    if (this.parquetTableMetadata.hasColumnMetadata()) {
+      primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
+      originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
+    } else {
+      primitiveType = columnMetadata.getPrimitiveType();
+      originalType = columnMetadata.getOriginalType();
+    }
     if (first) {
       if (hasSingleValue(columnMetadata)) {
-        columnTypeMap.put(schemaPath, getType(columnMetadata.primitiveType, columnMetadata.originalType));
+        columnTypeMap.put(schemaPath, getType(primitiveType, originalType));
         return true;
       } else {
         return false;
@@ -266,7 +274,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
           columnTypeMap.remove(schemaPath);
           return false;
         }
-        if (!getType(columnMetadata.primitiveType, columnMetadata.originalType).equals(columnTypeMap.get(schemaPath))) {
+        if (!getType(primitiveType, originalType).equals(columnTypeMap.get(schemaPath))) {
           columnTypeMap.remove(schemaPath);
           return false;
         }
@@ -278,69 +286,59 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private MajorType getType(PrimitiveTypeName type, OriginalType originalType) {
     if (originalType != null) {
       switch (originalType) {
-      case DECIMAL:
-        return Types.optional(MinorType.DECIMAL18);
-      case DATE:
-        return Types.optional(MinorType.DATE);
-      case TIME_MILLIS:
-        return Types.optional(MinorType.TIME);
-      case TIMESTAMP_MILLIS:
-        return Types.optional(MinorType.TIMESTAMP);
-      case UTF8:
-        return Types.optional(MinorType.VARCHAR);
-      case UINT_8:
-        return Types.optional(MinorType.UINT1);
-      case UINT_16:
-        return Types.optional(MinorType.UINT2);
-      case UINT_32:
-        return Types.optional(MinorType.UINT4);
-      case UINT_64:
-        return Types.optional(MinorType.UINT8);
-      case INT_8:
-        return Types.optional(MinorType.TINYINT);
-      case INT_16:
-        return Types.optional(MinorType.SMALLINT);
+        case DECIMAL:
+          return Types.optional(MinorType.DECIMAL18);
+        case DATE:
+          return Types.optional(MinorType.DATE);
+        case TIME_MILLIS:
+          return Types.optional(MinorType.TIME);
+        case TIMESTAMP_MILLIS:
+          return Types.optional(MinorType.TIMESTAMP);
+        case UTF8:
+          return Types.optional(MinorType.VARCHAR);
+        case UINT_8:
+          return Types.optional(MinorType.UINT1);
+        case UINT_16:
+          return Types.optional(MinorType.UINT2);
+        case UINT_32:
+          return Types.optional(MinorType.UINT4);
+        case UINT_64:
+          return Types.optional(MinorType.UINT8);
+        case INT_8:
+          return Types.optional(MinorType.TINYINT);
+        case INT_16:
+          return Types.optional(MinorType.SMALLINT);
       }
     }
 
     switch (type) {
-    case BOOLEAN:
-      return Types.optional(MinorType.BIT);
-    case INT32:
-      return Types.optional(MinorType.INT);
-    case INT64:
-      return Types.optional(MinorType.BIGINT);
-    case FLOAT:
-      return Types.optional(MinorType.FLOAT4);
-    case DOUBLE:
-      return Types.optional(MinorType.FLOAT8);
-    case BINARY:
-    case FIXED_LEN_BYTE_ARRAY:
-    case INT96:
-      return Types.optional(MinorType.VARBINARY);
-    default:
-      // Should never hit this
-      throw new UnsupportedOperationException("Unsupported type:" + type);
+      case BOOLEAN:
+        return Types.optional(MinorType.BIT);
+      case INT32:
+        return Types.optional(MinorType.INT);
+      case INT64:
+        return Types.optional(MinorType.BIGINT);
+      case FLOAT:
+        return Types.optional(MinorType.FLOAT4);
+      case DOUBLE:
+        return Types.optional(MinorType.FLOAT8);
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+      case INT96:
+        return Types.optional(MinorType.VARBINARY);
+      default:
+        // Should never hit this
+        throw new UnsupportedOperationException("Unsupported type:" + type);
     }
   }
 
   private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) {
-    Object max = columnChunkMetaData.max;
-    Object min = columnChunkMetaData.min;
-    return max != null && max.equals(min);
-/*
-    if (max != null && min != null) {
-      if (max instanceof byte[] && min instanceof byte[]) {
-        return Arrays.equals((byte[])max, (byte[])min);
-      }
-      return max.equals(min);
-    }
-    return false;
-*/
+    // ColumnMetadata will have a non-null value iff the minValue and the maxValue for the
+    // rowgroup are the same
+    return (columnChunkMetaData != null) && (columnChunkMetaData.hasSingleValue());
   }
 
-  @Override
-  public void modifyFileSelection(FileSelection selection) {
+  @Override public void modifyFileSelection(FileSelection selection) {
     entries.clear();
     fileSet = Sets.newHashSet();
     for (String fileName : selection.getFiles()) {
@@ -361,124 +359,124 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return columnTypeMap.get(schemaPath);
   }
 
-  private Map<String,Map<SchemaPath,Object>> partitionValueMap = Maps.newHashMap();
+  private Map<String, Map<SchemaPath, Object>> partitionValueMap = Maps.newHashMap();
 
   public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) {
     String f = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString();
     MinorType type = getTypeForColumn(column).getMinorType();
     switch (type) {
-    case INT: {
-      NullableIntVector intVector = (NullableIntVector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      intVector.getMutator().setSafe(index, value);
-      return;
-    }
-    case SMALLINT: {
-      NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      smallIntVector.getMutator().setSafe(index, value.shortValue());
-      return;
-    }
-    case TINYINT: {
-      NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      tinyIntVector.getMutator().setSafe(index, value.byteValue());
-      return;
-    }
-    case UINT1: {
-      NullableUInt1Vector intVector = (NullableUInt1Vector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      intVector.getMutator().setSafe(index, value.byteValue());
-      return;
-    }
-    case UINT2: {
-      NullableUInt2Vector intVector = (NullableUInt2Vector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      intVector.getMutator().setSafe(index, (char) value.shortValue());
-      return;
-    }
-    case UINT4: {
-      NullableUInt4Vector intVector = (NullableUInt4Vector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      intVector.getMutator().setSafe(index, value);
-      return;
-    }
-    case BIGINT: {
-      NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
-      Long value = (Long) partitionValueMap.get(f).get(column);
-      bigIntVector.getMutator().setSafe(index, value);
-      return;
-    }
-    case FLOAT4: {
-      NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
-      Float value = (Float) partitionValueMap.get(f).get(column);
-      float4Vector.getMutator().setSafe(index, value);
-      return;
-    }
-    case FLOAT8: {
-      NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
-      Double value = (Double) partitionValueMap.get(f).get(column);
-      float8Vector.getMutator().setSafe(index, value);
-      return;
-    }
-    case VARBINARY: {
-      NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
-      Object s = partitionValueMap.get(f).get(column);
-      byte[] bytes;
-      if (s instanceof Binary) {
-        bytes = ((Binary) s).getBytes();
-      } else if (s instanceof String) {
-        bytes = ((String) s).getBytes();
-      } else if (s instanceof byte[]) {
-        bytes = (byte[])s;
-      } else {
-        throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+      case INT: {
+        NullableIntVector intVector = (NullableIntVector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        intVector.getMutator().setSafe(index, value);
+        return;
       }
-      varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
-      return;
-    }
-    case DECIMAL18: {
-      NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
-      Long value = (Long) partitionValueMap.get(f).get(column);
-      decimalVector.getMutator().setSafe(index, value);
-      return;
-    }
-    case DATE: {
-      NullableDateVector dateVector = (NullableDateVector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
-      return;
-    }
-    case TIME: {
-      NullableTimeVector timeVector = (NullableTimeVector) v;
-      Integer value = (Integer) partitionValueMap.get(f).get(column);
-      timeVector.getMutator().setSafe(index, value);
-      return;
-    }
-    case TIMESTAMP: {
-      NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
-      Long value = (Long) partitionValueMap.get(f).get(column);
-      timeStampVector.getMutator().setSafe(index, value);
-      return;
-    }
-    case VARCHAR: {
-      NullableVarCharVector varCharVector = (NullableVarCharVector) v;
-      Object s = partitionValueMap.get(f).get(column);
-      byte[] bytes;
-      if (s instanceof String) { // if the metadata was read from a JSON cache file it maybe a string type
-        bytes = ((String) s).getBytes();
-      } else if (s instanceof Binary) {
-        bytes = ((Binary) s).getBytes();
-      } else if (s instanceof byte[]) {
-        bytes = (byte[])s;
-      } else {
-        throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+      case SMALLINT: {
+        NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        smallIntVector.getMutator().setSafe(index, value.shortValue());
+        return;
       }
-      varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
-      return;
-    }
-    default:
-      throw new UnsupportedOperationException("Unsupported type: " + type);
+      case TINYINT: {
+        NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        tinyIntVector.getMutator().setSafe(index, value.byteValue());
+        return;
+      }
+      case UINT1: {
+        NullableUInt1Vector intVector = (NullableUInt1Vector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        intVector.getMutator().setSafe(index, value.byteValue());
+        return;
+      }
+      case UINT2: {
+        NullableUInt2Vector intVector = (NullableUInt2Vector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        intVector.getMutator().setSafe(index, (char) value.shortValue());
+        return;
+      }
+      case UINT4: {
+        NullableUInt4Vector intVector = (NullableUInt4Vector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        intVector.getMutator().setSafe(index, value);
+        return;
+      }
+      case BIGINT: {
+        NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
+        Long value = (Long) partitionValueMap.get(f).get(column);
+        bigIntVector.getMutator().setSafe(index, value);
+        return;
+      }
+      case FLOAT4: {
+        NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
+        Float value = (Float) partitionValueMap.get(f).get(column);
+        float4Vector.getMutator().setSafe(index, value);
+        return;
+      }
+      case FLOAT8: {
+        NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
+        Double value = (Double) partitionValueMap.get(f).get(column);
+        float8Vector.getMutator().setSafe(index, value);
+        return;
+      }
+      case VARBINARY: {
+        NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
+        Object s = partitionValueMap.get(f).get(column);
+        byte[] bytes;
+        if (s instanceof Binary) {
+          bytes = ((Binary) s).getBytes();
+        } else if (s instanceof String) {
+          bytes = ((String) s).getBytes();
+        } else if (s instanceof byte[]) {
+          bytes = (byte[]) s;
+        } else {
+          throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+        }
+        varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+        return;
+      }
+      case DECIMAL18: {
+        NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
+        Long value = (Long) partitionValueMap.get(f).get(column);
+        decimalVector.getMutator().setSafe(index, value);
+        return;
+      }
+      case DATE: {
+        NullableDateVector dateVector = (NullableDateVector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+        return;
+      }
+      case TIME: {
+        NullableTimeVector timeVector = (NullableTimeVector) v;
+        Integer value = (Integer) partitionValueMap.get(f).get(column);
+        timeVector.getMutator().setSafe(index, value);
+        return;
+      }
+      case TIMESTAMP: {
+        NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
+        Long value = (Long) partitionValueMap.get(f).get(column);
+        timeStampVector.getMutator().setSafe(index, value);
+        return;
+      }
+      case VARCHAR: {
+        NullableVarCharVector varCharVector = (NullableVarCharVector) v;
+        Object s = partitionValueMap.get(f).get(column);
+        byte[] bytes;
+        if (s instanceof String) { // if the metadata was read from a JSON cache file it maybe a string type
+          bytes = ((String) s).getBytes();
+        } else if (s instanceof Binary) {
+          bytes = ((Binary) s).getBytes();
+        } else if (s instanceof byte[]) {
+          bytes = (byte[]) s;
+        } else {
+          throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+        }
+        varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+        return;
+      }
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + type);
     }
   }
 
@@ -568,26 +566,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     if (fileSet == null) {
       fileSet = Sets.newHashSet();
-      for (ParquetFileMetadata file : parquetTableMetadata.files) {
-        fileSet.add(file.path);
+      for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+        fileSet.add(file.getPath());
       }
     }
 
-    Map<String,DrillbitEndpoint> hostEndpointMap = Maps.newHashMap();
+    Map<String, DrillbitEndpoint> hostEndpointMap = Maps.newHashMap();
 
     for (DrillbitEndpoint endpoint : formatPlugin.getContext().getBits()) {
       hostEndpointMap.put(endpoint.getAddress(), endpoint);
     }
 
     rowGroupInfos = Lists.newArrayList();
-    for (ParquetFileMetadata file : parquetTableMetadata.files) {
+    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
       int rgIndex = 0;
-      for (RowGroupMetadata rg : file.rowGroups) {
-        RowGroupInfo rowGroupInfo = new RowGroupInfo(file.path, rg.start, rg.length, rgIndex);
+      for (RowGroupMetadata rg : file.getRowGroups()) {
+        RowGroupInfo rowGroupInfo =
+            new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex);
         EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
-        for (String host : rg.hostAffinity.keySet()) {
+        for (String host : rg.getHostAffinity().keySet()) {
           if (hostEndpointMap.containsKey(host)) {
-            endpointByteMap.add(hostEndpointMap.get(host), (long) (rg.hostAffinity.get(host) * rg.length));
+            endpointByteMap
+                .add(hostEndpointMap.get(host), (long) (rg.getHostAffinity().get(host) * rg.getLength()));
           }
         }
         rowGroupInfo.setEndpointByteMap(endpointByteMap);
@@ -601,24 +601,24 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     columnValueCounts = Maps.newHashMap();
     this.rowCount = 0;
     boolean first = true;
-    for (ParquetFileMetadata file : parquetTableMetadata.files) {
-      for (RowGroupMetadata rowGroup : file.rowGroups) {
-        long rowCount = rowGroup.rowCount;
-        for (ColumnMetadata column : rowGroup.columns) {
-          SchemaPath schemaPath = column.name;
+    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      for (RowGroupMetadata rowGroup : file.getRowGroups()) {
+        long rowCount = rowGroup.getRowCount();
+        for (ColumnMetadata column : rowGroup.getColumns()) {
+          SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
           Long previousCount = columnValueCounts.get(schemaPath);
           if (previousCount != null) {
             if (previousCount != GroupScan.NO_COLUMN_STATS) {
-              if (column.nulls != null) {
-                Long newCount = rowCount - column.nulls;
+              if (column.getNulls() != null) {
+                Long newCount = rowCount - column.getNulls();
                 columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
               } else {
 
               }
             }
           } else {
-            if (column.nulls != null) {
-              Long newCount = rowCount - column.nulls;
+            if (column.getNulls() != null) {
+              Long newCount = rowCount - column.getNulls();
               columnValueCounts.put(schemaPath, newCount);
             } else {
               columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
@@ -626,14 +626,13 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
           }
           boolean partitionColumn = checkForPartitionColumn(column, first);
           if (partitionColumn) {
-            Map<SchemaPath,Object> map = partitionValueMap.get(file.path);
+            Map<SchemaPath, Object> map = partitionValueMap.get(file.getPath());
             if (map == null) {
               map = Maps.newHashMap();
-              partitionValueMap.put(file.path, map);
+              partitionValueMap.put(file.getPath(), map);
             }
             Object value = map.get(schemaPath);
-            Object currentValue = column.max;
-//            Object currentValue = column.getMax();
+            Object currentValue = column.getMaxValue();
             if (value != null) {
               if (value != currentValue) {
                 columnTypeMap.remove(schemaPath);
@@ -645,20 +644,21 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
             columnTypeMap.remove(schemaPath);
           }
         }
-        this.rowCount += rowGroup.rowCount;
+        this.rowCount += rowGroup.getRowCount();
         first = false;
       }
     }
   }
 
-  private ParquetTableMetadata_v1 removeUnneededRowGroups(ParquetTableMetadata_v1 parquetTableMetadata) {
+  private ParquetTableMetadataBase removeUnneededRowGroups(ParquetTableMetadataBase parquetTableMetadata) {
     List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList();
-    for (ParquetFileMetadata file : parquetTableMetadata.files) {
-      if (fileSet.contains(file.path)) {
+    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      if (fileSet.contains(file.getPath())) {
         newFileMetadataList.add(file);
       }
     }
-    return new ParquetTableMetadata_v1(newFileMetadataList, new ArrayList<String>());
+    parquetTableMetadata.assignFiles(newFileMetadataList);
+    return parquetTableMetadata;
   }
 
   /**
@@ -703,7 +703,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     @Override
     protected IOException convertToIOException(Exception e) {
-      return new IOException(String.format("Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(), rgi.getStart()));
+      return new IOException(String.format(
+          "Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(),
+          rgi.getStart()));
     }
 
   }
@@ -714,11 +716,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext());
   }
 
-  @Override
-  public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < mappings.size() : String.format(
-        "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
-        minorFragmentId);
+  @Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.size() : String
+        .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.",
+            mappings.size(), minorFragmentId);
 
     List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
 


Mime
View raw message