parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [13/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:10 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
new file mode 100644
index 0000000..a4baf98
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -0,0 +1,778 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.parquet.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.parquet.Log;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.GlobalMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.hadoop.util.SerializationUtil;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+/**
+ * The input format to read a Parquet file.
+ *
+ * It requires an implementation of {@link ReadSupport} to materialize the records.
+ *
+ * The requestedSchema will control how the original records get projected by the loader.
+ * It must be a subset of the original schema. Only the columns needed to reconstruct the records with the requestedSchema will be scanned.
+ *
+ * @see #READ_SUPPORT_CLASS
+ * @see #UNBOUND_RECORD_FILTER
+ * @see #STRICT_TYPE_CHECKING
+ * @see #FILTER_PREDICATE
+ * @see #TASK_SIDE_METADATA
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized records
+ */
+public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
+
+  private static final Log LOG = Log.getLog(ParquetInputFormat.class);
+
+  /**
+   * key to configure the ReadSupport implementation
+   */
+  public static final String READ_SUPPORT_CLASS = "parquet.read.support.class";
+
+  /**
+   * key to configure the filter
+   */
+  public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
+
+  /**
+   * key to configure type checking for conflicting schemas (default: true)
+   */
+  public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
+
+  /**
+   * key to configure the filter predicate
+   */
+  public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
+
+  /**
+   * key to turn on or off task side metadata loading (default true)
+   * if true then metadata is read on the task side and some tasks may finish immediately.
+   * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
+   */
+  public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
+
+  private static final int MIN_FOOTER_CACHE_SIZE = 100;
+
+  public static void setTaskSideMetaData(Job job,  boolean taskSideMetadata) {
+    ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
+  }
+
+  public static boolean isTaskSideMetaData(Configuration configuration) {
+    return configuration.getBoolean(TASK_SIDE_METADATA, TRUE);
+  }
+
+  public static void setReadSupportClass(Job job,  Class<?> readSupportClass) {
+    ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
+  }
+
+  public static void setUnboundRecordFilter(Job job, Class<? extends UnboundRecordFilter> filterClass) {
+    Configuration conf = ContextUtil.getConfiguration(job);
+    checkArgument(getFilterPredicate(conf) == null,
+        "You cannot provide an UnboundRecordFilter after providing a FilterPredicate");
+
+    conf.set(UNBOUND_RECORD_FILTER, filterClass.getName());
+  }
+
+  /**
+   * @deprecated use {@link #getFilter(Configuration)}
+   */
+  @Deprecated
+  public static Class<?> getUnboundRecordFilter(Configuration configuration) {
+    return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
+  }
+
+  private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) {
+    Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
+    if (clazz == null) { return null; }
+
+    try {
+      UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance();
+
+      if (unboundRecordFilter instanceof Configurable) {
+        ((Configurable)unboundRecordFilter).setConf(configuration);
+      }
+
+      return unboundRecordFilter;
+    } catch (InstantiationException e) {
+      throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+    } catch (IllegalAccessException e) {
+      throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+    }
+  }
+
+  public static void setReadSupportClass(JobConf conf, Class<?> readSupportClass) {
+    conf.set(READ_SUPPORT_CLASS, readSupportClass.getName());
+  }
+
+  public static Class<?> getReadSupportClass(Configuration configuration) {
+    return ConfigurationUtil.getClassFromConfig(configuration, READ_SUPPORT_CLASS, ReadSupport.class);
+  }
+
+  public static void setFilterPredicate(Configuration configuration, FilterPredicate filterPredicate) {
+    checkArgument(getUnboundRecordFilter(configuration) == null,
+        "You cannot provide a FilterPredicate after providing an UnboundRecordFilter");
+
+    configuration.set(FILTER_PREDICATE + ".human.readable", filterPredicate.toString());
+    try {
+      SerializationUtil.writeObjectToConfAsBase64(FILTER_PREDICATE, filterPredicate, configuration);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static FilterPredicate getFilterPredicate(Configuration configuration) {
+    try {
+      return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Returns a non-null Filter, which is a wrapper around either a
+   * FilterPredicate, an UnboundRecordFilter, or a no-op filter.
+   */
+  public static Filter getFilter(Configuration conf) {
+    return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
+  }
+
+  private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+
+  private final Class<? extends ReadSupport<T>> readSupportClass;
+
+  /**
+   * Hadoop will instantiate using this constructor
+   */
+  public ParquetInputFormat() {
+    this.readSupportClass = null;
+  }
+
+  /**
+   * Constructor for subclasses, such as AvroParquetInputFormat, or wrappers.
+   * <p>
+   * Subclasses and wrappers may use this constructor to set the ReadSupport
+   * class that will be used when reading instead of requiring the user to set
+   * the read support property in their configuration.
+   *
+   * @param readSupportClass a ReadSupport subclass
+   */
+  public <S extends ReadSupport<T>> ParquetInputFormat(Class<S> readSupportClass) {
+    this.readSupportClass = readSupportClass;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public RecordReader<Void, T> createRecordReader(
+      InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
+    ReadSupport<T> readSupport = getReadSupport(conf);
+    return new ParquetRecordReader<T>(readSupport, getFilter(conf));
+  }
+
+  /**
+   * @param configuration to find the configuration for the read support
+   * @return the configured read support
+   * @deprecated use getReadSupportInstance static methods instead
+   */
+  @Deprecated
+  @SuppressWarnings("unchecked")
+  ReadSupport<T> getReadSupport(Configuration configuration){
+    return getReadSupportInstance(readSupportClass == null ?
+        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration) :
+        readSupportClass);
+  }
+
+  /**
+   * @param configuration to find the configuration for the read support
+   * @return the configured read support
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> ReadSupport<T> getReadSupportInstance(Configuration configuration){
+    return getReadSupportInstance(
+        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
+  }
+
+  /**
+   * @param readSupportClass to instantiate
+   * @return the configured read support
+   */
+  @SuppressWarnings("unchecked")
+  static <T> ReadSupport<T> getReadSupportInstance(
+      Class<? extends ReadSupport<T>> readSupportClass){
+    try {
+      return readSupportClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new BadConfigurationException("could not instantiate read support class", e);
+    } catch (IllegalAccessException e) {
+      throw new BadConfigurationException("could not instantiate read support class", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
+    Configuration configuration = ContextUtil.getConfiguration(jobContext);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (isTaskSideMetaData(configuration)) {
+      // Although not required by the API, some clients may depend on always
+      // receiving ParquetInputSplit. Translation is required at some point.
+      for (InputSplit split : super.getSplits(jobContext)) {
+        Preconditions.checkArgument(split instanceof FileSplit,
+            "Cannot wrap non-FileSplit: " + split);
+        splits.add(ParquetInputSplit.from((FileSplit) split));
+      }
+      return splits;
+
+    } else {
+      splits.addAll(getSplits(configuration, getFooters(jobContext)));
+    }
+
+    return splits;
+  }
+
+  /**
+   * @param configuration the configuration to connect to the file system
+   * @param footers the footers of the files to read
+   * @return the splits for the footers
+   * @throws IOException
+   * @deprecated split planning using file footers will be removed
+   */
+  @Deprecated
+  public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
+    boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
+    final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
+    final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
+    if (maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
+    }
+    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
+    ReadContext readContext = getReadSupport(configuration).init(new InitContext(
+        configuration,
+        globalMetaData.getKeyValueMetaData(),
+        globalMetaData.getSchema()));
+
+    return new ClientSideMetadataSplitStrategy().getSplits(
+        configuration, footers, maxSplitSize, minSplitSize, readContext);
+  }
+
+  /*
+   * This is to support multi-level/recursive directory listing until
+   * MAPREDUCE-1577 is fixed.
+   */
+  @Override
+  protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+    return getAllFileRecursively(super.listStatus(jobContext),
+       ContextUtil.getConfiguration(jobContext));
+  }
+
+  private static List<FileStatus> getAllFileRecursively(
+      List<FileStatus> files, Configuration conf) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : files) {
+      if (file.isDir()) {
+        Path p = file.getPath();
+        FileSystem fs = p.getFileSystem(conf);
+        staticAddInputPathRecursively(result, fs, p, HiddenFileFilter.INSTANCE);
+      } else {
+        result.add(file);
+      }
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  private static void staticAddInputPathRecursively(List<FileStatus> result,
+      FileSystem fs, Path path, PathFilter inputFilter)
+          throws IOException {
+    for (FileStatus stat: fs.listStatus(path, inputFilter)) {
+      if (stat.isDir()) {
+        staticAddInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+      } else {
+        result.add(stat);
+      }
+    }
+  }
+
+  /**
+   * @param jobContext the current job context
+   * @return the footers for the files
+   * @throws IOException
+   */
+  public List<Footer> getFooters(JobContext jobContext) throws IOException {
+    List<FileStatus> statuses = listStatus(jobContext);
+    if (statuses.isEmpty()) {
+      return Collections.emptyList();
+    }
+    Configuration config = ContextUtil.getConfiguration(jobContext);
+    List<Footer> footers = new ArrayList<Footer>(statuses.size());
+    Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
+    Map<Path, FileStatusWrapper> missingStatusesMap =
+            new HashMap<Path, FileStatusWrapper>(missingStatuses.size());
+
+    if (footersCache == null) {
+      footersCache =
+              new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
+    }
+    for (FileStatus status : statuses) {
+      FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
+      FootersCacheValue cacheEntry =
+              footersCache.getCurrentValue(statusWrapper);
+      if (Log.DEBUG) {
+        LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
+                + " found for '" + status.getPath() + "'");
+      }
+      if (cacheEntry != null) {
+        footers.add(cacheEntry.getFooter());
+      } else {
+        missingStatuses.add(status);
+        missingStatusesMap.put(status.getPath(), statusWrapper);
+      }
+    }
+    if (Log.DEBUG) {
+      LOG.debug("found " + footers.size() + " footers in cache and adding up "
+              + "to " + missingStatuses.size() + " missing footers to the cache");
+    }
+
+
+    if (missingStatuses.isEmpty()) {
+      return footers;
+    }
+
+    List<Footer> newFooters = getFooters(config, missingStatuses);
+    for (Footer newFooter : newFooters) {
+      // Use the original file status objects to make sure we store a
+      // conservative (older) modification time (i.e. in case the files and
+      // footers were modified and it's not clear which version of the footers
+      // we have)
+      FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
+      footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
+    }
+
+    footers.addAll(newFooters);
+    return footers;
+  }
+
+  public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+    return getFooters(configuration, (Collection<FileStatus>)statuses);
+  }
+
+  /**
+   * the footers for the files
+   * @param configuration to connect to the file system
+   * @param statuses the files to open
+   * @return the footers of the files
+   * @throws IOException
+   */
+  public List<Footer> getFooters(Configuration configuration, Collection<FileStatus> statuses) throws IOException {
+    if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
+    boolean taskSideMetaData = isTaskSideMetaData(configuration);
+    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, taskSideMetaData);
+  }
+
+  /**
+   * @param jobContext the current job context
+   * @return the merged metadata from the footers
+   * @throws IOException
+   */
+  public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOException {
+    return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
+  }
+
+  /**
+   * A simple wrapper around {@link org.apache.parquet.hadoop.Footer} that also includes a
+   * modification time associated with that footer.  The modification time is
+   * used to determine whether the footer is still current.
+   */
+  static final class FootersCacheValue
+          implements LruCache.Value<FileStatusWrapper, FootersCacheValue> {
+    private final long modificationTime;
+    private final Footer footer;
+
+    public FootersCacheValue(FileStatusWrapper status, Footer footer) {
+      this.modificationTime = status.getModificationTime();
+      this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
+    }
+
+    @Override
+    public boolean isCurrent(FileStatusWrapper key) {
+      long currentModTime = key.getModificationTime();
+      boolean isCurrent = modificationTime >= currentModTime;
+      if (Log.DEBUG && !isCurrent) {
+        LOG.debug("The cache value for '" + key + "' is not current: "
+                + "cached modification time=" + modificationTime + ", "
+                + "current modification time: " + currentModTime);
+      }
+      return isCurrent;
+    }
+
+    public Footer getFooter() {
+      return footer;
+    }
+
+    @Override
+    public boolean isNewerThan(FootersCacheValue otherValue) {
+      return otherValue == null ||
+              modificationTime > otherValue.modificationTime;
+    }
+
+    public Path getPath() {
+      return footer.getFile();
+    }
+  }
+
+  /**
+   * A simple wrapper around {@link org.apache.hadoop.fs.FileStatus} with a
+   * meaningful "toString()" method
+   */
+  static final class FileStatusWrapper {
+    private final FileStatus status;
+    public FileStatusWrapper(FileStatus fileStatus) {
+      if (fileStatus == null) {
+        throw new IllegalArgumentException("FileStatus object cannot be null");
+      }
+      status = fileStatus;
+    }
+
+    public long getModificationTime() {
+      return status.getModificationTime();
+    }
+
+    @Override
+    public int hashCode() {
+      return status.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof FileStatusWrapper &&
+              status.equals(((FileStatusWrapper) other).status);
+    }
+
+    @Override
+    public String toString() {
+      return status.getPath().toString();
+    }
+  }
+
+}
+
+class ClientSideMetadataSplitStrategy {
+  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
+  private static class HDFSBlocks {
+    BlockLocation[] hdfsBlocks;
+    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
+    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
+
+    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
+      this.hdfsBlocks = hdfsBlocks;
+      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
+        @Override
+        public int compare(BlockLocation b1, BlockLocation b2) {
+          return Long.signum(b1.getOffset() - b2.getOffset());
+        }
+      };
+      Arrays.sort(hdfsBlocks, comparator);
+    }
+
+    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
+      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
+      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
+    }
+
+    /**
+     * @param rowGroupMetadata
+     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
+     * return false if the mid point of row group is in the same hdfs block
+     */
+    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
+      boolean isNewHdfsBlock = false;
+      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
+
+      //if mid point is not in the current HDFS block any more, return true
+      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
+        isNewHdfsBlock = true;
+        currentMidPointHDFSBlockIndex++;
+        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
+                  + rowGroupMidPoint
+                  + ", the end of the hdfs block is "
+                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
+      }
+
+      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
+        currentStartHdfsBlockIndex++;
+        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
+                  + rowGroupMetadata.getStartingPos()
+                  + " but the end of hdfs blocks of file is "
+                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
+      }
+      return isNewHdfsBlock;
+    }
+
+    public BlockLocation getCurrentBlock() {
+      return hdfsBlocks[currentStartHdfsBlockIndex];
+    }
+  }
+
+  static class SplitInfo {
+    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
+    BlockLocation hdfsBlock;
+    long compressedByteSize = 0L;
+
+    public SplitInfo(BlockLocation currentBlock) {
+      this.hdfsBlock = currentBlock;
+    }
+
+    private void addRowGroup(BlockMetaData rowGroup) {
+      this.rowGroups.add(rowGroup);
+      this.compressedByteSize += rowGroup.getCompressedSize();
+    }
+
+    public long getCompressedByteSize() {
+      return compressedByteSize;
+    }
+
+    public List<BlockMetaData> getRowGroups() {
+      return rowGroups;
+    }
+
+    int getRowGroupCount() {
+      return rowGroups.size();
+    }
+
+    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
+      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+      long length = 0;
+
+      for (BlockMetaData block : this.getRowGroups()) {
+        List<ColumnChunkMetaData> columns = block.getColumns();
+        for (ColumnChunkMetaData column : columns) {
+          if (requested.containsPath(column.getPath().toArray())) {
+            length += column.getTotalSize();
+          }
+        }
+      }
+
+      BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
+      long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
+
+      long[] rowGroupOffsets = new long[this.getRowGroupCount()];
+      for (int i = 0; i < rowGroupOffsets.length; i++) {
+        rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
+      }
+
+      return new ParquetInputSplit(
+              fileStatus.getPath(),
+              hdfsBlock.getOffset(),
+              end,
+              length,
+              hdfsBlock.getHosts(),
+              rowGroupOffsets
+      );
+    }
+  }
+
+  private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
+
+  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+      long maxSplitSize, long minSplitSize, ReadContext readContext)
+      throws IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    Filter filter = ParquetInputFormat.getFilter(configuration);
+
+    long rowGroupsDropped = 0;
+    long totalRowGroups = 0;
+
+    for (Footer footer : footers) {
+      final Path file = footer.getFile();
+      LOG.debug(file);
+      FileSystem fs = file.getFileSystem(configuration);
+      FileStatus fileStatus = fs.getFileStatus(file);
+      ParquetMetadata parquetMetaData = footer.getParquetMetadata();
+      List<BlockMetaData> blocks = parquetMetaData.getBlocks();
+
+      List<BlockMetaData> filteredBlocks;
+
+      totalRowGroups += blocks.size();
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
+      rowGroupsDropped += blocks.size() - filteredBlocks.size();
+
+      if (filteredBlocks.isEmpty()) {
+        continue;
+      }
+
+      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      splits.addAll(
+          generateSplits(
+              filteredBlocks,
+              fileBlockLocations,
+              fileStatus,
+              readContext.getRequestedSchema().toString(),
+              readContext.getReadSupportMetadata(),
+              minSplitSize,
+              maxSplitSize)
+          );
+    }
+
+    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
+      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
+      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
+    } else {
+      LOG.info("There were no row groups that could be dropped due to filter predicates");
+    }
+    return splits;
+  }
+
+  /**
+   * groups together all the data blocks for the same HDFS block
+   *
+   * @param rowGroupBlocks      data blocks (row groups)
+   * @param hdfsBlocksArray     hdfs blocks
+   * @param fileStatus          the containing file
+   * @param requestedSchema     the schema requested by the user
+   * @param readSupportMetadata the metadata provided by the readSupport implementation in init
+   * @param minSplitSize        the mapred.min.split.size
+   * @param maxSplitSize        the mapred.max.split.size
+   * @return the splits (one per HDFS block)
+   * @throws IOException If hosts can't be retrieved for the HDFS block
+   */
+  static <T> List<ParquetInputSplit> generateSplits(
+          List<BlockMetaData> rowGroupBlocks,
+          BlockLocation[] hdfsBlocksArray,
+          FileStatus fileStatus,
+          String requestedSchema,
+          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+
+    List<SplitInfo> splitRowGroups =
+        generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
+
+    //generate splits from rowGroups of each split
+    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+    for (SplitInfo splitInfo : splitRowGroups) {
+      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
+      resultSplits.add(split);
+    }
+    return resultSplits;
+  }
+
+  static List<SplitInfo> generateSplitInfo(
+      List<BlockMetaData> rowGroupBlocks,
+      BlockLocation[] hdfsBlocksArray,
+      long minSplitSize, long maxSplitSize) {
+    List<SplitInfo> splitRowGroups;
+
+    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+    }
+    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
+    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
+    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+
+    //assign rowGroups to splits
+    splitRowGroups = new ArrayList<SplitInfo>();
+    checkSorted(rowGroupBlocks);//assert row groups are sorted
+    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
+      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
+             && currentSplit.getCompressedByteSize() >= minSplitSize
+             && currentSplit.getCompressedByteSize() > 0)
+           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
+        //create a new split
+        splitRowGroups.add(currentSplit);//finish previous split
+        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+      }
+      currentSplit.addRowGroup(rowGroupMetadata);
+    }
+
+    if (currentSplit.getRowGroupCount() > 0) {
+      splitRowGroups.add(currentSplit);
+    }
+
+    return splitRowGroups;
+  }
+
+  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
+    long previousOffset = 0L;
+    for(BlockMetaData rowGroup: rowGroupBlocks) {
+      long currentOffset = rowGroup.getStartingPos();
+      if (currentOffset < previousOffset) {
+        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
new file mode 100644
index 0000000..8a13a3c
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
@@ -0,0 +1,294 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+/**
+ * An input split for the Parquet format
+ * It contains the information to read one block of the file.
+ *
+ * This class is private to the ParquetInputFormat.
+ * Backward compatibility is not maintained.
+ *
+ * @author Julien Le Dem
+ */
+@Private
+public class ParquetInputSplit extends FileSplit implements Writable {
+
+
+  private long end;
+  private long[] rowGroupOffsets;
+
+  /**
+   * Writables must have a parameterless constructor
+   */
+  public ParquetInputSplit() {
+        super(null, 0, 0, new String[0]);
+  }
+
+  /**
+   * For compatibility only
+   * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[])}
+   * @param path
+   * @param start
+   * @param length
+   * @param hosts
+   * @param blocks
+   * @param requestedSchema
+   * @param fileSchema
+   * @param extraMetadata
+   * @param readSupportMetadata
+   */
+  @Deprecated
+  public ParquetInputSplit(
+      Path path,
+      long start,
+      long length,
+      String[] hosts,
+      List<BlockMetaData> blocks,
+      String requestedSchema,
+      String fileSchema,
+      Map<String, String> extraMetadata,
+      Map<String, String> readSupportMetadata) {
+    this(path, start, length, end(blocks, requestedSchema), hosts, offsets(blocks));
+  }
+
+  private static long end(List<BlockMetaData> blocks, String requestedSchema) {
+    MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+    long length = 0;
+
+    for (BlockMetaData block : blocks) {
+      List<ColumnChunkMetaData> columns = block.getColumns();
+      for (ColumnChunkMetaData column : columns) {
+        if (requested.containsPath(column.getPath().toArray())) {
+          length += column.getTotalSize();
+        }
+      }
+    }
+    return length;
+  }
+
+  private static long[] offsets(List<BlockMetaData> blocks) {
+    long[] offsets = new long[blocks.size()];
+    for (int i = 0; i < offsets.length; i++) {
+      offsets[i] = blocks.get(i).getStartingPos();
+    }
+    return offsets;
+  }
+
+  /**
+   * @return the block meta data
+   * @deprecated the file footer is no longer read before creating input splits
+   */
+  @Deprecated
+  public List<BlockMetaData> getBlocks() {
+    throw new UnsupportedOperationException(
+        "Splits no longer have row group metadata, see PARQUET-234");
+  }
+
+  /**
+   * Builds a {@code ParquetInputSplit} from a mapreduce {@link FileSplit}.
+   *
+   * @param split a mapreduce FileSplit
+   * @return a ParquetInputSplit
+   * @throws IOException
+   */
+  static ParquetInputSplit from(FileSplit split) throws IOException {
+    return new ParquetInputSplit(split.getPath(),
+        split.getStart(), split.getStart() + split.getLength(),
+        split.getLength(), split.getLocations(), null);
+  }
+
+  /**
+   * Builds a {@code ParquetInputSplit} from a mapred
+   * {@link org.apache.hadoop.mapred.FileSplit}.
+   *
+   * @param split a mapreduce FileSplit
+   * @return a ParquetInputSplit
+   * @throws IOException
+   */
+  static ParquetInputSplit from(org.apache.hadoop.mapred.FileSplit split) throws IOException {
+    return new ParquetInputSplit(split.getPath(),
+        split.getStart(), split.getStart() + split.getLength(),
+        split.getLength(), split.getLocations(), null);
+  }
+
+  /**
+   * @param file the path of the file for that split
+   * @param start the start offset in the file
+   * @param end the end offset in the file
+   * @param length the actual size in bytes that we expect to read
+   * @param hosts the hosts with the replicas of this data
+   * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
+   */
+  public ParquetInputSplit(
+      Path file, long start, long end, long length, String[] hosts,
+      long[] rowGroupOffsets) {
+    super(file, start, length, hosts);
+    this.end = end;
+    this.rowGroupOffsets = rowGroupOffsets;
+  }
+
+  /**
+   * @return the requested schema
+   * @deprecated the file footer is no longer read before creating input splits
+   */
+  @Deprecated
+  String getRequestedSchema() {
+    throw new UnsupportedOperationException(
+        "Splits no longer have the requested schema, see PARQUET-234");
+  }
+
+  /**
+   * @return the file schema
+   * @deprecated the file footer is no longer read before creating input splits
+   */
+  @Deprecated
+  public String getFileSchema() {
+    throw new UnsupportedOperationException(
+        "Splits no longer have the file schema, see PARQUET-234");
+  }
+
+  /**
+   * @return the end offset of that split
+   */
+  public long getEnd() {
+    return end;
+  }
+
+  /**
+   * @return app specific metadata from the file
+   * @deprecated the file footer is no longer read before creating input splits
+   */
+  @Deprecated
+  public Map<String, String> getExtraMetadata() {
+    throw new UnsupportedOperationException(
+        "Splits no longer have file metadata, see PARQUET-234");
+  }
+
+  /**
+   * @return app specific metadata provided by the read support in the init phase
+   */
+  @Deprecated
+  Map<String, String> getReadSupportMetadata() {
+    throw new UnsupportedOperationException(
+        "Splits no longer have read-support metadata, see PARQUET-234");
+  }
+
+  /**
+   * @return the offsets of the row group selected if this has been determined on the client side
+   */
+  public long[] getRowGroupOffsets() {
+    return rowGroupOffsets;
+  }
+
+  @Override
+  public String toString() {
+    String hosts;
+    try{
+       hosts = Arrays.toString(getLocations());
+    } catch (Exception e) {
+      // IOException/InterruptedException could be thrown
+      hosts = "(" + e + ")";
+    }
+
+    return this.getClass().getSimpleName() + "{" +
+           "part: " + getPath()
+        + " start: " + getStart()
+        + " end: " + getEnd()
+        + " length: " + getLength()
+        + " hosts: " + hosts
+        + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
+        + "}";
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void readFields(DataInput hin) throws IOException {
+    byte[] bytes = readArray(hin);
+    DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)));
+    super.readFields(in);
+    this.end = in.readLong();
+    if (in.readBoolean()) {
+      this.rowGroupOffsets = new long[in.readInt()];
+      for (int i = 0; i < rowGroupOffsets.length; i++) {
+        rowGroupOffsets[i] = in.readLong();
+      }
+    }
+    in.close();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(DataOutput hout) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
+    super.write(out);
+    out.writeLong(end);
+    out.writeBoolean(rowGroupOffsets != null);
+    if (rowGroupOffsets != null) {
+      out.writeInt(rowGroupOffsets.length);
+      for (long o : rowGroupOffsets) {
+        out.writeLong(o);
+      }
+    }
+    out.close();
+    writeArray(hout, baos.toByteArray());
+  }
+
+  private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
+    out.writeInt(bytes.length);
+    out.write(bytes, 0, bytes.length);
+  }
+
+  private static byte[] readArray(DataInput in) throws IOException {
+    int len = in.readInt();
+    byte[] bytes = new byte[len];
+    in.readFully(bytes);
+    return bytes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
new file mode 100644
index 0000000..a1589c0
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java
@@ -0,0 +1,72 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+public class ParquetOutputCommitter extends FileOutputCommitter {
+  private static final Log LOG = Log.getLog(ParquetOutputCommitter.class);
+
+  private final Path outputPath;
+
+  public ParquetOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.outputPath = outputPath;
+  }
+
+  public void commitJob(JobContext jobContext) throws IOException {
+    super.commitJob(jobContext);
+    Configuration configuration = ContextUtil.getConfiguration(jobContext);
+    writeMetaDataFile(configuration,outputPath);
+  }
+
+  public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
+    if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
+      try {
+        final FileSystem fileSystem = outputPath.getFileSystem(configuration);
+        FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
+        List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus);
+        try {
+          ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers);
+        } catch (Exception e) {
+          LOG.warn("could not write summary file for " + outputPath, e);
+          final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);
+          if (fileSystem.exists(metadataPath)) {
+            fileSystem.delete(metadataPath, true);
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("could not write summary file for " + outputPath, e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
new file mode 100644
index 0000000..d849843
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -0,0 +1,353 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.Log.INFO;
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.codec.CodecConfig;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ConfigurationUtil;
+
+/**
+ * OutputFormat to write to a Parquet file
+ *
+ * It requires a {@link WriteSupport} to convert the actual records to the underlying format.
+ * It requires the schema of the incoming records. (provided by the write support)
+ * It allows storing extra metadata in the footer (for example: for schema compatibility purpose when converting from a different schema language).
+ *
+ * The format configuration settings in the job configuration:
+ * <pre>
+ * # The block size is the size of a row group being buffered in memory
+ * # this limits the memory usage when writing
+ * # Larger values will improve the IO when reading but consume more memory when writing
+ * parquet.block.size=134217728 # in bytes, default = 128 * 1024 * 1024
+ *
+ * # The page size is for compression. When reading, each page can be decompressed independently.
+ * # A block is composed of pages. The page is the smallest unit that must be read fully to access a single record.
+ * # If this value is too small, the compression will deteriorate
+ * parquet.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
+ *
+ * # There is one dictionary page per column per row group when dictionary encoding is used.
+ * # The dictionary page size works like the page size but for dictionary
+ * parquet.dictionary.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
+ *
+ * # The compression algorithm used to compress pages
+ * parquet.compression=UNCOMPRESSED # one of: UNCOMPRESSED, SNAPPY, GZIP, LZO. Default: UNCOMPRESSED. Supersedes mapred.output.compress*
+ *
+ * # The write support class to convert the records written to the OutputFormat into the events accepted by the record consumer
+ * # Usually provided by a specific ParquetOutputFormat subclass
+ * parquet.write.support.class= # fully qualified name
+ *
+ * # To enable/disable dictionary encoding
+ * parquet.enable.dictionary=true # false to disable dictionary encoding
+ *
+ * # To enable/disable summary metadata aggregation at the end of a MR job
+ * # The default is true (enabled)
+ * parquet.enable.summary-metadata=true # false to disable summary aggregation
+ * </pre>
+ *
+ * If parquet.compression is not set, the following properties are checked (FileOutputFormat behavior).
+ * Note that we explicitely disallow custom Codecs
+ * <pre>
+ * mapred.output.compress=true
+ * mapred.output.compression.codec=org.apache.hadoop.io.compress.SomeCodec # the codec must be one of Snappy, GZip or LZO
+ * </pre>
+ *
+ * if none of those is set the data is uncompressed.
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized records
+ */
+public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
+  private static final Log LOG = Log.getLog(ParquetOutputFormat.class);
+
+  public static final String BLOCK_SIZE           = "parquet.block.size";
+  public static final String PAGE_SIZE            = "parquet.page.size";
+  public static final String COMPRESSION          = "parquet.compression";
+  public static final String WRITE_SUPPORT_CLASS  = "parquet.write.support.class";
+  public static final String DICTIONARY_PAGE_SIZE = "parquet.dictionary.page.size";
+  public static final String ENABLE_DICTIONARY    = "parquet.enable.dictionary";
+  public static final String VALIDATION           = "parquet.validation";
+  public static final String WRITER_VERSION       = "parquet.writer.version";
+  public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
+  public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
+  public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
+
+  public static void setWriteSupportClass(Job job,  Class<?> writeSupportClass) {
+    getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
+  }
+
+  public static void setWriteSupportClass(JobConf job, Class<?> writeSupportClass) {
+      job.set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
+  }
+
+  public static Class<?> getWriteSupportClass(Configuration configuration) {
+    final String className = configuration.get(WRITE_SUPPORT_CLASS);
+    if (className == null) {
+      return null;
+    }
+    final Class<?> writeSupportClass = ConfigurationUtil.getClassFromConfig(configuration, WRITE_SUPPORT_CLASS, WriteSupport.class);
+    return writeSupportClass;
+  }
+
+  public static void setBlockSize(Job job, int blockSize) {
+    getConfiguration(job).setInt(BLOCK_SIZE, blockSize);
+  }
+
+  public static void setPageSize(Job job, int pageSize) {
+    getConfiguration(job).setInt(PAGE_SIZE, pageSize);
+  }
+
+  public static void setDictionaryPageSize(Job job, int pageSize) {
+    getConfiguration(job).setInt(DICTIONARY_PAGE_SIZE, pageSize);
+  }
+
+  public static void setCompression(Job job, CompressionCodecName compression) {
+    getConfiguration(job).set(COMPRESSION, compression.name());
+  }
+
+  public static void setEnableDictionary(Job job, boolean enableDictionary) {
+    getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary);
+  }
+
+  public static boolean getEnableDictionary(JobContext jobContext) {
+    return getEnableDictionary(getConfiguration(jobContext));
+  }
+
+  public static int getBlockSize(JobContext jobContext) {
+    return getBlockSize(getConfiguration(jobContext));
+  }
+
+  public static int getPageSize(JobContext jobContext) {
+    return getPageSize(getConfiguration(jobContext));
+  }
+
+  public static int getDictionaryPageSize(JobContext jobContext) {
+    return getDictionaryPageSize(getConfiguration(jobContext));
+  }
+
+  public static CompressionCodecName getCompression(JobContext jobContext) {
+    return getCompression(getConfiguration(jobContext));
+  }
+
+  public static boolean isCompressionSet(JobContext jobContext) {
+    return isCompressionSet(getConfiguration(jobContext));
+  }
+
+  public static void setValidation(JobContext jobContext, boolean validating) {
+    setValidation(getConfiguration(jobContext), validating);
+  }
+
+  public static boolean getValidation(JobContext jobContext) {
+    return getValidation(getConfiguration(jobContext));
+  }
+
+  public static boolean getEnableDictionary(Configuration configuration) {
+    return configuration.getBoolean(ENABLE_DICTIONARY, true);
+  }
+
+  @Deprecated
+  public static int getBlockSize(Configuration configuration) {
+    return configuration.getInt(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
+  }
+
+  public static long getLongBlockSize(Configuration configuration) {
+    return configuration.getLong(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
+  }
+
+  public static int getPageSize(Configuration configuration) {
+    return configuration.getInt(PAGE_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  public static int getDictionaryPageSize(Configuration configuration) {
+    return configuration.getInt(DICTIONARY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  public static WriterVersion getWriterVersion(Configuration configuration) {
+    String writerVersion = configuration.get(WRITER_VERSION, WriterVersion.PARQUET_1_0.toString());
+    return WriterVersion.fromString(writerVersion);
+  }
+
+  public static CompressionCodecName getCompression(Configuration configuration) {
+    return CodecConfig.getParquetCompressionCodec(configuration);
+  }
+
+  public static boolean isCompressionSet(Configuration configuration) {
+    return CodecConfig.isParquetCompressionSet(configuration);
+  }
+
+  public static void setValidation(Configuration configuration, boolean validating) {
+    configuration.setBoolean(VALIDATION, validating);
+  }
+
+  public static boolean getValidation(Configuration configuration) {
+    return configuration.getBoolean(VALIDATION, false);
+  }
+
+  private CompressionCodecName getCodec(TaskAttemptContext taskAttemptContext) {
+    return CodecConfig.from(taskAttemptContext).getCodec();
+  }
+
+
+
+  private WriteSupport<T> writeSupport;
+  private ParquetOutputCommitter committer;
+
+  /**
+   * constructor used when this OutputFormat in wrapped in another one (In Pig for example)
+   * @param writeSupportClass the class used to convert the incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to be stored in the footer of the file
+   */
+  public <S extends WriteSupport<T>> ParquetOutputFormat(S writeSupport) {
+    this.writeSupport = writeSupport;
+  }
+
+  /**
+   * used when directly using the output format and configuring the write support implementation
+   * using parquet.write.support.class
+   */
+  public <S extends WriteSupport<T>> ParquetOutputFormat() {
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+
+    final Configuration conf = getConfiguration(taskAttemptContext);
+
+    CompressionCodecName codec = getCodec(taskAttemptContext);
+    String extension = codec.getExtension() + ".parquet";
+    Path file = getDefaultWorkFile(taskAttemptContext, extension);
+    return getRecordWriter(conf, file, codec);
+  }
+
+  public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext, Path file)
+      throws IOException, InterruptedException {
+    return getRecordWriter(getConfiguration(taskAttemptContext), file, getCodec(taskAttemptContext));
+  }
+
+  public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
+        throws IOException, InterruptedException {
+    final WriteSupport<T> writeSupport = getWriteSupport(conf);
+
+    CodecFactory codecFactory = new CodecFactory(conf);
+    long blockSize = getLongBlockSize(conf);
+    if (INFO) LOG.info("Parquet block size to " + blockSize);
+    int pageSize = getPageSize(conf);
+    if (INFO) LOG.info("Parquet page size to " + pageSize);
+    int dictionaryPageSize = getDictionaryPageSize(conf);
+    if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
+    boolean enableDictionary = getEnableDictionary(conf);
+    if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
+    boolean validating = getValidation(conf);
+    if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
+    WriterVersion writerVersion = getWriterVersion(conf);
+    if (INFO) LOG.info("Writer version is: " + writerVersion);
+
+    WriteContext init = writeSupport.init(conf);
+    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
+    w.start();
+
+    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
+        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+    long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
+        MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
+    if (memoryManager == null) {
+      memoryManager = new MemoryManager(maxLoad, minAllocation);
+    } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
+      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
+          "be reset by the new value: " + maxLoad);
+    }
+
+    return new ParquetRecordWriter<T>(
+        w,
+        writeSupport,
+        init.getSchema(),
+        init.getExtraMetaData(),
+        blockSize, pageSize,
+        codecFactory.getCompressor(codec, pageSize),
+        dictionaryPageSize,
+        enableDictionary,
+        validating,
+        writerVersion,
+        memoryManager);
+  }
+
+  /**
+   * @param configuration to find the configuration for the write support class
+   * @return the configured write support
+   */
+  @SuppressWarnings("unchecked")
+  public WriteSupport<T> getWriteSupport(Configuration configuration){
+    if (writeSupport != null) return writeSupport;
+    Class<?> writeSupportClass = getWriteSupportClass(configuration);
+    try {
+      return (WriteSupport<T>)checkNotNull(writeSupportClass, "writeSupportClass").newInstance();
+    } catch (InstantiationException e) {
+      throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
+    } catch (IllegalAccessException e) {
+      throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
+    }
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException {
+    if (committer == null) {
+      Path output = getOutputPath(context);
+      committer = new ParquetOutputCommitter(output, context);
+    }
+    return committer;
+  }
+
+
+  /**
+   * This memory manager is for all the real writers (InternalParquetRecordWriter) in one task.
+   */
+  private static MemoryManager memoryManager;
+
+  static MemoryManager getMemoryManager() {
+    return memoryManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
new file mode 100644
index 0000000..a45dde5
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -0,0 +1,195 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Read records from a Parquet file.
+ * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
+ */
+public class ParquetReader<T> implements Closeable {
+
+  private final ReadSupport<T> readSupport;
+  private final Configuration conf;
+  private final Iterator<Footer> footersIterator;
+  private final Filter filter;
+
+  private InternalParquetRecordReader<T> reader;
+
+  /**
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
+   */
+  @Deprecated
+  public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
+    this(new Configuration(), file, readSupport, FilterCompat.NOOP);
+  }
+
+  /**
+   * @param conf the configuration
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
+   */
+  @Deprecated
+  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
+    this(conf, file, readSupport, FilterCompat.NOOP);
+  }
+
+  /**
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @param unboundRecordFilter the filter to use to filter records
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
+   */
+  @Deprecated
+  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
+  }
+
+  /**
+   * @param conf the configuration
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @param unboundRecordFilter the filter to use to filter records
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
+   */
+  @Deprecated
+  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
+  }
+
+  private ParquetReader(Configuration conf,
+                       Path file,
+                       ReadSupport<T> readSupport,
+                       Filter filter) throws IOException {
+    this.readSupport = readSupport;
+    this.filter = checkNotNull(filter, "filter");
+    this.conf = conf;
+
+    FileSystem fs = file.getFileSystem(conf);
+    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE));
+    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
+    this.footersIterator = footers.iterator();
+  }
+
+  /**
+   * @return the next record or null if finished
+   * @throws IOException
+   */
+  public T read() throws IOException {
+    try {
+      if (reader != null && reader.nextKeyValue()) {
+        return reader.getCurrentValue();
+      } else {
+        initReader();
+        return reader == null ? null : read();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void initReader() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+    if (footersIterator.hasNext()) {
+      Footer footer = footersIterator.next();
+
+      List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+
+      MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
+
+      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
+          filter, blocks, fileSchema);
+
+      reader = new InternalParquetRecordReader<T>(readSupport, filter);
+      reader.initialize(fileSchema,
+          footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+          footer.getFile(), filteredBlocks, conf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
+    return new Builder<T>(readSupport, path);
+  }
+
+  public static class Builder<T> {
+    private final ReadSupport<T> readSupport;
+    private final Path file;
+    private Configuration conf;
+    private Filter filter;
+
+    private Builder(ReadSupport<T> readSupport, Path path) {
+      this.readSupport = checkNotNull(readSupport, "readSupport");
+      this.file = checkNotNull(path, "path");
+      this.conf = new Configuration();
+      this.filter = FilterCompat.NOOP;
+    }
+
+    public Builder<T> withConf(Configuration conf) {
+      this.conf = checkNotNull(conf, "conf");
+      return this;
+    }
+
+    public Builder<T> withFilter(Filter filter) {
+      this.filter = checkNotNull(filter, "filter");
+      return this;
+    }
+
+    public ParquetReader<T> build() throws IOException {
+      return new ParquetReader<T>(conf, file, readSupport, filter);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
new file mode 100644
index 0000000..7b83770
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.parquet.Log;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Reads the records from a block of a Parquet file
+ *
+ * @see ParquetInputFormat
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> type of the materialized records
+ */
+public class ParquetRecordReader<T> extends RecordReader<Void, T> {
+
+  private static final Log LOG = Log.getLog(ParquetRecordReader.class);
+  private final InternalParquetRecordReader<T> internalReader;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   */
+  public ParquetRecordReader(ReadSupport<T> readSupport) {
+    this(readSupport, FilterCompat.NOOP);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter for filtering individual records
+   */
+  public ParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
+    internalReader = new InternalParquetRecordReader<T>(readSupport, filter);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter for filtering individual records
+   * @deprecated use {@link #ParquetRecordReader(ReadSupport, Filter)}
+   */
+  @Deprecated
+  public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+    this(readSupport, FilterCompat.get(filter));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    internalReader.close();
+  }
+
+  /**
+   * always returns null
+   */
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public T getCurrentValue() throws IOException,
+  InterruptedException {
+    return internalReader.getCurrentValue();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return internalReader.getProgress();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
+      BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>) context);
+    } else {
+      LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is "
+              + context.getClass().getCanonicalName());
+    }
+
+    initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));
+  }
+
+  public void initialize(InputSplit inputSplit, Configuration configuration, Reporter reporter)
+      throws IOException, InterruptedException {
+    BenchmarkCounter.initCounterFromReporter(reporter,configuration);
+    initializeInternalReader(toParquetSplit(inputSplit), configuration);
+  }
+
+  private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
+    Path path = split.getPath();
+    long[] rowGroupOffsets = split.getRowGroupOffsets();
+    List<BlockMetaData> filteredBlocks;
+    ParquetMetadata footer;
+    // if task.side.metadata is set, rowGroupOffsets is null
+    if (rowGroupOffsets == null) {
+      // then we need to apply the predicate push down filter
+      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
+      MessageType fileSchema = footer.getFileMetaData().getSchema();
+      Filter filter = getFilter(configuration);
+      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+    } else {
+      // otherwise we find the row groups that were selected on the client
+      footer = readFooter(configuration, path, NO_FILTER);
+      Set<Long> offsets = new HashSet<Long>();
+      for (long offset : rowGroupOffsets) {
+        offsets.add(offset);
+      }
+      filteredBlocks = new ArrayList<BlockMetaData>();
+      for (BlockMetaData block : footer.getBlocks()) {
+        if (offsets.contains(block.getStartingPos())) {
+          filteredBlocks.add(block);
+        }
+      }
+      // verify we found them all
+      if (filteredBlocks.size() != rowGroupOffsets.length) {
+        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+        }
+        // this should never happen.
+        // provide a good error message in case there's a bug
+        throw new IllegalStateException(
+            "All the offsets listed in the split should be found in the file."
+            + " expected: " + Arrays.toString(rowGroupOffsets)
+            + " found: " + filteredBlocks
+            + " out of: " + Arrays.toString(foundRowGroupOffsets)
+            + " in range " + split.getStart() + ", " + split.getEnd());
+      }
+    }
+    MessageType fileSchema = footer.getFileMetaData().getSchema();
+    Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
+    internalReader.initialize(
+        fileSchema, fileMetaData, path, filteredBlocks, configuration);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return internalReader.nextKeyValue();
+  }
+
+  private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
+    if (split instanceof ParquetInputSplit) {
+      return (ParquetInputSplit) split;
+    } else if (split instanceof FileSplit) {
+      return ParquetInputSplit.from((FileSplit) split);
+    } else if (split instanceof org.apache.hadoop.mapred.FileSplit) {
+      return ParquetInputSplit.from(
+          (org.apache.hadoop.mapred.FileSplit) split);
+    } else {
+      throw new IllegalArgumentException(
+          "Invalid split (not a FileSplit or ParquetInputSplit): " + split);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
new file mode 100644
index 0000000..2449192
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
@@ -0,0 +1,126 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Writes records to a Parquet file
+ *
+ * @see ParquetOutputFormat
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized records
+ */
+public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
+
+  private InternalParquetRecordWriter<T> internalWriter;
+  private MemoryManager memoryManager;
+
+  /**
+   *
+   * @param w the file to write to
+   * @param writeSupport the class to convert incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to write in the footer of the file
+   * @param blockSize the size of a block in the file (this will be approximate)
+   * @param compressor the compressor used to compress the pages
+   * @param dictionaryPageSize the threshold for dictionary size
+   * @param enableDictionary to enable the dictionary
+   * @param validating if schema validation should be turned on
+   */
+  @Deprecated
+  public ParquetRecordWriter(
+      ParquetFileWriter w,
+      WriteSupport<T> writeSupport,
+      MessageType schema,
+      Map<String, String> extraMetaData,
+      int blockSize, int pageSize,
+      BytesCompressor compressor,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion) {
+    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
+        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
+        validating, writerVersion);
+  }
+
+  /**
+   *
+   * @param w the file to write to
+   * @param writeSupport the class to convert incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to write in the footer of the file
+   * @param blockSize the size of a block in the file (this will be approximate)
+   * @param compressor the compressor used to compress the pages
+   * @param dictionaryPageSize the threshold for dictionary size
+   * @param enableDictionary to enable the dictionary
+   * @param validating if schema validation should be turned on
+   */
+  public ParquetRecordWriter(
+      ParquetFileWriter w,
+      WriteSupport<T> writeSupport,
+      MessageType schema,
+      Map<String, String> extraMetaData,
+      long blockSize, int pageSize,
+      BytesCompressor compressor,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion,
+      MemoryManager memoryManager) {
+    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
+        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
+        validating, writerVersion);
+    this.memoryManager = checkNotNull(memoryManager, "memoryManager");
+    memoryManager.addWriter(internalWriter, blockSize);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    internalWriter.close();
+    if (memoryManager != null) {
+      memoryManager.removeWriter(internalWriter);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(Void key, T value) throws IOException, InterruptedException {
+    internalWriter.write(value);
+  }
+
+}


Mime
View raw message