parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [14/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:11 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
new file mode 100644
index 0000000..9968b5d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -0,0 +1,175 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.String.format;
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
+import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.MessageType;
+
+class InternalParquetRecordWriter<T> {
+  private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
+
+  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+
+  private final ParquetFileWriter parquetFileWriter;
+  private final WriteSupport<T> writeSupport;
+  private final MessageType schema;
+  private final Map<String, String> extraMetaData;
+  private final long rowGroupSize;
+  private long rowGroupSizeThreshold;
+  private final int pageSize;
+  private final BytesCompressor compressor;
+  private final boolean validating;
+  private final ParquetProperties parquetProperties;
+
+  private long recordCount = 0;
+  private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+  private ColumnWriteStore columnStore;
+  private ColumnChunkPageWriteStore pageStore;
+
+
+  /**
+   * @param parquetFileWriter the file to write to
+   * @param writeSupport the class to convert incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to write in the footer of the file
+   * @param rowGroupSize the size of a block in the file (this will be approximate)
+   * @param compressor the codec used to compress
+   */
+  public InternalParquetRecordWriter(
+      ParquetFileWriter parquetFileWriter,
+      WriteSupport<T> writeSupport,
+      MessageType schema,
+      Map<String, String> extraMetaData,
+      long rowGroupSize,
+      int pageSize,
+      BytesCompressor compressor,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion) {
+    this.parquetFileWriter = parquetFileWriter;
+    this.writeSupport = checkNotNull(writeSupport, "writeSupport");
+    this.schema = schema;
+    this.extraMetaData = extraMetaData;
+    this.rowGroupSize = rowGroupSize;
+    this.rowGroupSizeThreshold = rowGroupSize;
+    this.pageSize = pageSize;
+    this.compressor = compressor;
+    this.validating = validating;
+    this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
+    initStore();
+  }
+
+  private void initStore() {
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
+    columnStore = parquetProperties.newColumnWriteStore(
+        schema,
+        pageStore,
+        pageSize);
+    MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
+    writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
+  }
+
+  public void close() throws IOException, InterruptedException {
+    flushRowGroupToStore();
+    FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
+    Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
+    finalMetadata.putAll(finalWriteContext.getExtraMetaData());
+    parquetFileWriter.end(finalMetadata);
+  }
+
+  public void write(T value) throws IOException, InterruptedException {
+    writeSupport.write(value);
+    ++ recordCount;
+    checkBlockSizeReached();
+  }
+
+  private void checkBlockSizeReached() throws IOException {
+    if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
+      long memSize = columnStore.getBufferedSize();
+      if (memSize > rowGroupSizeThreshold) {
+        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
+        flushRowGroupToStore();
+        initStore();
+        recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
+      } else {
+        float recordSize = (float) memSize / recordCount;
+        recordCountForNextMemCheck = min(
+            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
+            recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
+            );
+        if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
+      }
+    }
+  }
+
+  private void flushRowGroupToStore()
+      throws IOException {
+    LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
+    if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
+      LOG.warn("Too much memory used: " + columnStore.memUsageString());
+    }
+
+    if (recordCount > 0) {
+      parquetFileWriter.startBlock(recordCount);
+      columnStore.flush();
+      pageStore.flushToFileWriter(parquetFileWriter);
+      recordCount = 0;
+      parquetFileWriter.endBlock();
+    }
+
+    columnStore = null;
+    pageStore = null;
+  }
+
+  long getRowGroupSizeThreshold() {
+    return rowGroupSizeThreshold;
+  }
+
+  void setRowGroupSizeThreshold(long rowGroupSizeThreshold) {
+    this.rowGroupSizeThreshold = rowGroupSizeThreshold;
+  }
+
+  MessageType getSchema() {
+    return this.schema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/LruCache.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/LruCache.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/LruCache.java
new file mode 100644
index 0000000..44f9eca
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/LruCache.java
@@ -0,0 +1,199 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.Log;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A basic implementation of an LRU cache.  Besides evicting the least recently
+ * used entries (either based on insertion or access order), this class also
+ * checks for "stale" entries as entries are inserted or retrieved (note
+ * "staleness" is defined by the entries themselves (see
+ * {@link org.apache.parquet.hadoop.LruCache.Value}).
+ *
+ * @param <K> The key type. Acts as the key in a {@link java.util.LinkedHashMap}
+ * @param <V> The value type.  Must extend {@link org.apache.parquet.hadoop.LruCache.Value}
+ *           so that the "staleness" of the value can be easily determined.
+ */
+final class LruCache<K, V extends LruCache.Value<K, V>> {
+  private static final Log LOG = Log.getLog(LruCache.class);
+
+  private static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+  private final LinkedHashMap<K, V> cacheMap;
+
+  /**
+   * Constructs an access-order based LRU cache with {@code maxSize} entries.
+   * @param maxSize The maximum number of entries to store in the cache.
+   */
+  public LruCache(final int maxSize) {
+    this(maxSize, DEFAULT_LOAD_FACTOR, true);
+  }
+
+  /**
+   * Constructs an LRU cache.
+   *
+   * @param maxSize The maximum number of entries to store in the cache.
+   * @param loadFactor Used to determine the initial capacity.
+   * @param accessOrder the ordering mode - {@code true} for access-order,
+   * {@code false} for insertion-order
+   */
+  public LruCache(final int maxSize, final float loadFactor, final boolean accessOrder) {
+    int initialCapacity = Math.round(maxSize / loadFactor);
+    cacheMap =
+            new LinkedHashMap<K, V>(initialCapacity, loadFactor, accessOrder) {
+              @Override
+              public boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
+                boolean result = size() > maxSize;
+                if (result) {
+                  if (Log.DEBUG) {
+                    LOG.debug("Removing eldest entry in cache: "
+                            + eldest.getKey());
+                  }
+                }
+                return result;
+              }
+            };
+  }
+
+  /**
+   * Removes the mapping for the specified key from this cache if present.
+   * @param key key whose mapping is to be removed from the cache
+   * @return the previous value associated with key, or null if there was no
+   * mapping for key.
+   */
+  public V remove(final K key) {
+    V oldValue = cacheMap.remove(key);
+    if (oldValue != null) {
+      if (Log.DEBUG) {
+        LOG.debug("Removed cache entry for '" + key + "'");
+      }
+    }
+    return oldValue;
+  }
+
+  /**
+   * Associates the specified value with the specified key in this cache. The
+   * value is only inserted if it is not null and it is considered current. If
+   * the cache previously contained a mapping for the key, the old value is
+   * replaced only if the new value is "newer" than the old one.
+   * @param key key with which the specified value is to be associated
+   * @param newValue value to be associated with the specified key
+   */
+  public void put(final K key, final V newValue) {
+    if (newValue == null || !newValue.isCurrent(key)) {
+      if (Log.WARN) {
+        LOG.warn("Ignoring new cache entry for '" + key + "' because it is "
+                + (newValue == null ? "null" : "not current"));
+      }
+      return;
+    }
+
+    V oldValue = cacheMap.get(key);
+    if (oldValue != null && oldValue.isNewerThan(newValue)) {
+      if (Log.WARN) {
+        LOG.warn("Ignoring new cache entry for '" + key + "' because "
+                + "existing cache entry is newer");
+      }
+      return;
+    }
+
+    // no existing value or new value is newer than old value
+    oldValue = cacheMap.put(key, newValue);
+    if (Log.DEBUG) {
+      if (oldValue == null) {
+        LOG.debug("Added new cache entry for '" + key + "'");
+      } else {
+        LOG.debug("Overwrote existing cache entry for '" + key + "'");
+      }
+    }
+  }
+
+  /**
+   * Removes all of the mappings from this cache. The cache will be empty
+   * after this call returns.
+   */
+  public void clear() {
+    cacheMap.clear();
+  }
+
+  /**
+   * Returns the value to which the specified key is mapped, or null if 1) the
+   * value is not current or 2) this cache contains no mapping for the key.
+   * @param key the key whose associated value is to be returned
+   * @return the value to which the specified key is mapped, or null if 1) the
+   * value is not current or 2) this cache contains no mapping for the key
+   */
+  public V getCurrentValue(final K key) {
+    V value = cacheMap.get(key);
+    if (Log.DEBUG) {
+      LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "")
+              + "in cache");
+    }
+    if (value != null && !value.isCurrent(key)) {
+      // value is not current; remove it and return null
+      remove(key);
+      return null;
+    }
+
+    return value;
+  }
+
+  /**
+   * Returns the number of key-value mappings in this cache.
+   * @return the number of key-value mappings in this cache.
+   */
+  public int size() {
+    return cacheMap.size();
+  }
+
+  /**
+   * {@link org.apache.parquet.hadoop.LruCache} expects all values to follow this
+   * interface so the cache can determine 1) whether values are current (e.g.
+   * the referenced data has not been modified/updated in such a way that the
+   * value is no longer useful) and 2) whether a value is strictly "newer"
+   * than another value.
+   *
+   * @param <K> The key type.
+   * @param <V> Provides a bound for the {@link #isNewerThan(V)} method
+   */
+  interface Value<K, V> {
+    /**
+     * Is the value still current (e.g. has the referenced data been
+     * modified/updated in such a way that the value is no longer useful)
+     * @param key the key associated with this value
+     * @return {@code true} the value is still current, {@code false} the value
+     * is no longer useful
+     */
+    boolean isCurrent(K key);
+
+    /**
+     * Compares this value with the specified value to check for relative age.
+     * @param otherValue the value to be compared.
+     * @return {@code true} the value is strictly newer than the other value,
+     * {@code false} the value is older or just
+     * as new as the other value.
+     */
+    boolean isNewerThan(V otherValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
new file mode 100644
index 0000000..ec70b87
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
@@ -0,0 +1,158 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.ParquetRuntimeException;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements a memory manager that keeps a global context of how many Parquet
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ *
+ * This class balances the allocation size of each writer by resize them averagely.
+ * When the sum of each writer's allocation size  is less than total memory pool,
+ * keep them original value.
+ * When the sum exceeds, decrease each writer's allocation size by a ratio.
+ */
+public class MemoryManager {
+  private static final Log LOG = Log.getLog(MemoryManager.class);
+  static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
+  static final long DEFAULT_MIN_MEMORY_ALLOCATION = 1 * 1024 * 1024; // 1MB
+  private final float memoryPoolRatio;
+
+  private final long totalMemoryPool;
+  private final long minMemoryAllocation;
+  private final Map<InternalParquetRecordWriter, Long> writerList = new
+      HashMap<InternalParquetRecordWriter, Long>();
+
+  public MemoryManager(float ratio, long minAllocation) {
+    checkRatio(ratio);
+
+    memoryPoolRatio = ratio;
+    minMemoryAllocation = minAllocation;
+    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
+        () * ratio);
+    LOG.debug(String.format("Allocated total memory pool is: %,d", totalMemoryPool));
+  }
+
+  private void checkRatio(float ratio) {
+    if (ratio <= 0 || ratio > 1) {
+      throw new IllegalArgumentException("The configured memory pool ratio " + ratio + " is " +
+          "not between 0 and 1.");
+    }
+  }
+
+  /**
+   * Add a new writer and its memory allocation to the memory manager.
+   * @param writer the new created writer
+   * @param allocation the requested buffer size
+   */
+  synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
+    Long oldValue = writerList.get(writer);
+    if (oldValue == null) {
+      writerList.put(writer, allocation);
+    } else {
+      throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
+          "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
+          "the writer: " + writer);
+    }
+    updateAllocation();
+  }
+
+  /**
+   * Remove the given writer from the memory manager.
+   * @param writer the writer that has been closed
+   */
+  synchronized void removeWriter(InternalParquetRecordWriter writer) {
+    if (writerList.containsKey(writer)) {
+      writerList.remove(writer);
+    }
+    if (!writerList.isEmpty()) {
+      updateAllocation();
+    }
+  }
+
+  /**
+   * Update the allocated size of each writer based on the current allocations and pool size.
+   */
+  private void updateAllocation() {
+    long totalAllocations = 0;
+    double scale;
+    for (Long allocation : writerList.values()) {
+      totalAllocations += allocation;
+    }
+    if (totalAllocations <= totalMemoryPool) {
+      scale = 1.0;
+    } else {
+      scale = (double) totalMemoryPool / totalAllocations;
+      LOG.warn(String.format(
+          "Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" +
+          "Scaling row group sizes to %.2f%% for %d writers",
+          100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
+    }
+
+    int maxColCount = 0;
+    for (InternalParquetRecordWriter w : writerList.keySet()) {
+      maxColCount = Math.max(w.getSchema().getColumns().size(), maxColCount);
+    }
+
+    for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
+      long newSize = (long) Math.floor(entry.getValue() * scale);
+      if(scale < 1.0 && minMemoryAllocation > 0 && newSize < minMemoryAllocation) {
+          throw new ParquetRuntimeException(String.format("New Memory allocation %d bytes" +
+          " is smaller than the minimum allocation size of %d bytes.",
+              newSize, minMemoryAllocation)){};
+      }
+      entry.getKey().setRowGroupSizeThreshold(newSize);
+      LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
+            entry.getValue(), newSize, entry.getKey()));
+    }
+  }
+
+  /**
+   * Get the total memory pool size that is available for writers.
+   * @return the number of bytes in the memory pool
+   */
+  long getTotalMemoryPool() {
+    return totalMemoryPool;
+  }
+
+  /**
+   * Get the writers list
+   * @return the writers in this memory manager
+   */
+  Map<InternalParquetRecordWriter, Long> getWriterList() {
+    return writerList;
+  }
+
+  /**
+   * Get the ratio of memory allocated for all the writers.
+   * @return the memory pool ratio
+   */
+  float getMemoryPoolRatio() {
+    return memoryPoolRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
new file mode 100644
index 0000000..49e0833
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -0,0 +1,782 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
+import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
+import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.SequenceInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
+import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
+import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Internal implementation of the Parquet file reader as a block container
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetFileReader implements Closeable {
+
+  private static final Log LOG = Log.getLog(ParquetFileReader.class);
+
+  public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";
+
+  private static ParquetMetadataConverter converter = new ParquetMetadataConverter();
+
+  /**
+   * for files provided, check if there's a summary file.
+   * If a summary file is found it is used otherwise the file footer is used.
+   * @param configuration the hadoop conf to connect to the file system;
+   * @param partFiles the part files to read
+   * @return the footers for those files using the summary file if possible.
+   * @throws IOException
+   */
+  @Deprecated
+  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
+    return readAllFootersInParallelUsingSummaryFiles(configuration, partFiles, false);
+  }
+
+  private static MetadataFilter filter(boolean skipRowGroups) {
+    return skipRowGroups ? SKIP_ROW_GROUPS : NO_FILTER;
+  }
+
+  /**
+   * for files provided, check if there's a summary file.
+   * If a summary file is found it is used otherwise the file footer is used.
+   * @param configuration the hadoop conf to connect to the file system;
+   * @param partFiles the part files to read
+   * @param skipRowGroups to skipRowGroups in the footers
+   * @return the footers for those files using the summary file if possible.
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
+      final Configuration configuration,
+      final Collection<FileStatus> partFiles,
+      final boolean skipRowGroups) throws IOException {
+
+    // figure out list of all parents to part files
+    Set<Path> parents = new HashSet<Path>();
+    for (FileStatus part : partFiles) {
+      parents.add(part.getPath().getParent());
+    }
+
+    // read corresponding summary files if they exist
+    List<Callable<Map<Path, Footer>>> summaries = new ArrayList<Callable<Map<Path, Footer>>>();
+    for (final Path path : parents) {
+      summaries.add(new Callable<Map<Path, Footer>>() {
+        @Override
+        public Map<Path, Footer> call() throws Exception {
+          ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
+          if (mergedMetadata != null) {
+            final List<Footer> footers;
+            if (skipRowGroups) {
+              footers = new ArrayList<Footer>();
+              for (FileStatus f : partFiles) {
+                footers.add(new Footer(f.getPath(), mergedMetadata));
+              }
+            } else {
+              footers = footersFromSummaryFile(path, mergedMetadata);
+            }
+            Map<Path, Footer> map = new HashMap<Path, Footer>();
+            for (Footer footer : footers) {
+              // the folder may have been moved
+              footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
+              map.put(footer.getFile(), footer);
+            }
+            return map;
+          } else {
+            return Collections.emptyMap();
+          }
+        }
+      });
+    }
+
+    Map<Path, Footer> cache = new HashMap<Path, Footer>();
+    try {
+      List<Map<Path, Footer>> footersFromSummaries = runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), summaries);
+      for (Map<Path, Footer> footers : footersFromSummaries) {
+        cache.putAll(footers);
+      }
+    } catch (ExecutionException e) {
+      throw new IOException("Error reading summaries", e);
+    }
+
+    // keep only footers for files actually requested and read file footer if not found in summaries
+    List<Footer> result = new ArrayList<Footer>(partFiles.size());
+    List<FileStatus> toRead = new ArrayList<FileStatus>();
+    for (FileStatus part : partFiles) {
+      Footer f = cache.get(part.getPath());
+      if (f != null) {
+        result.add(f);
+      } else {
+        toRead.add(part);
+      }
+    }
+
+    if (toRead.size() > 0) {
+      // read the footers of the files that did not have a summary file
+      if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
+      result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
+    }
+
+    return result;
+  }
+
+  private static <T> List<T> runAllInParallel(int parallelism, List<Callable<T>> toRun) throws ExecutionException {
+    LOG.info("Initiating action with parallelism: " + parallelism);
+    ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
+    try {
+      List<Future<T>> futures = new ArrayList<Future<T>>();
+      for (Callable<T> callable : toRun) {
+        futures.add(threadPool.submit(callable));
+      }
+      List<T> result = new ArrayList<T>(toRun.size());
+      for (Future<T> future : futures) {
+        try {
+          result.add(future.get());
+        } catch (InterruptedException e) {
+          throw new RuntimeException("The thread was interrupted", e);
+        }
+      }
+      return result;
+    } finally {
+      threadPool.shutdownNow();
+    }
+  }
+
+  @Deprecated
+  public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+    return readAllFootersInParallel(configuration, partFiles, false);
+  }
+
+  /**
+   * read all the footers of the files provided
+   * (not using summary files)
+   * @param configuration the conf to access the File System
+   * @param partFiles the files to read
+   * @param skipRowGroups to skip the rowGroup info
+   * @return the footers
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
+    List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
+    for (final FileStatus currentFile : partFiles) {
+      footers.add(new Callable<Footer>() {
+        @Override
+        public Footer call() throws Exception {
+          try {
+            return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
+          } catch (IOException e) {
+            throw new IOException("Could not read footer for file " + currentFile, e);
+          }
+        }
+      });
+    }
+    try {
+      return runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), footers);
+    } catch (ExecutionException e) {
+      throw new IOException("Could not read footer: " + e.getMessage(), e.getCause());
+    }
+  }
+
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * not using summary files.
+   * rowGroups are not skipped
+   * @param configuration the configuration to access the FS
+   * @param fileStatus the root dir
+   * @return all the footers
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
+    List<FileStatus> statuses = listFiles(configuration, fileStatus);
+    return readAllFootersInParallel(configuration, statuses, false);
+  }
+
+  @Deprecated
+  public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
+    return readFooters(configuration, status(configuration, path));
+  }
+
+  private static FileStatus status(Configuration configuration, Path path) throws IOException {
+    return path.getFileSystem(configuration).getFileStatus(path);
+  }
+
+  /**
+   * this always returns the row groups
+   * @param configuration
+   * @param pathStatus
+   * @return
+   * @throws IOException
+   */
+  @Deprecated
+  public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
+    return readFooters(configuration, pathStatus, false);
+  }
+
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * using summary files if possible
+   * @param configuration the configuration to access the FS
+   * @param fileStatus the root dir
+   * @return all the footers
+   * @throws IOException
+   */
+  public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
+    List<FileStatus> files = listFiles(configuration, pathStatus);
+    return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
+  }
+
+  private static List<FileStatus> listFiles(Configuration conf, FileStatus fileStatus) throws IOException {
+    if (fileStatus.isDir()) {
+      FileSystem fs = fileStatus.getPath().getFileSystem(conf);
+      FileStatus[] list = fs.listStatus(fileStatus.getPath(), HiddenFileFilter.INSTANCE);
+      List<FileStatus> result = new ArrayList<FileStatus>();
+      for (FileStatus sub : list) {
+        result.addAll(listFiles(conf, sub));
+      }
+      return result;
+    } else {
+      return Arrays.asList(fileStatus);
+    }
+  }
+
+  /**
+   * Specifically reads a given summary file
+   * @param configuration
+   * @param summaryStatus
+   * @return the metadata translated for each file
+   * @throws IOException
+   */
+  public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
+    final Path parent = summaryStatus.getPath().getParent();
+    ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
+    return footersFromSummaryFile(parent, mergedFooters);
+  }
+
+  static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
+    Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
+    Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
+    FileSystem fileSystem = basePath.getFileSystem(configuration);
+    if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
+      // reading the summary file that does not contain the row groups
+      if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
+      return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
+    } else if (fileSystem.exists(metadataFile)) {
+      if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
+      return readFooter(configuration, metadataFile, filter(skipRowGroups));
+    } else {
+      return null;
+    }
+  }
+
+  static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
+    Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
+    List<BlockMetaData> blocks = mergedFooters.getBlocks();
+    for (BlockMetaData block : blocks) {
+      String path = block.getPath();
+      Path fullPath = new Path(parent, path);
+      ParquetMetadata current = footers.get(fullPath);
+      if (current == null) {
+        current = new ParquetMetadata(mergedFooters.getFileMetaData(), new ArrayList<BlockMetaData>());
+        footers.put(fullPath, current);
+      }
+      current.getBlocks().add(block);
+    }
+    List<Footer> result = new ArrayList<Footer>();
+    for (Entry<Path, ParquetMetadata> entry : footers.entrySet()) {
+      result.add(new Footer(entry.getKey(), entry.getValue()));
+    }
+    return result;
+  }
+
+  /**
+   * Reads the meta data block in the footer of the file
+   * @param configuration
+   * @param file the parquet File
+   * @return the metadata blocks in the footer
+   * @throws IOException if an error occurs while reading the file
+   */
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
+    return readFooter(configuration, file, NO_FILTER);
+  }
+
+  /**
+   * Reads the meta data in the footer of the file.
+   * Skipping row groups (or not) based on the provided filter
+   * @param configuration
+   * @param file the Parquet File
+   * @param filter the filter to apply to row groups
+   * @return the metadata with row groups filtered.
+   * @throws IOException  if an error occurs while reading the file
+   */
+  public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
+    FileSystem fileSystem = file.getFileSystem(configuration);
+    return readFooter(configuration, fileSystem.getFileStatus(file), filter);
+  }
+
+  /**
+   * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
+   */
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+    return readFooter(configuration, file, NO_FILTER);
+  }
+
+  /**
+   * Reads the meta data block in the footer of the file
+   * @param configuration
+   * @param file the parquet File
+   * @param filter the filter to apply to row groups
+   * @return the metadata blocks in the footer
+   * @throws IOException if an error occurs while reading the file
+   */
+  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
+    FileSystem fileSystem = file.getPath().getFileSystem(configuration);
+    FSDataInputStream f = fileSystem.open(file.getPath());
+    try {
+      long l = file.getLen();
+      if (Log.DEBUG) LOG.debug("File length " + l);
+      int FOOTER_LENGTH_SIZE = 4;
+      if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
+        throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
+      }
+      long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
+      if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex);
+
+      f.seek(footerLengthIndex);
+      int footerLength = readIntLittleEndian(f);
+      byte[] magic = new byte[MAGIC.length];
+      f.readFully(magic);
+      if (!Arrays.equals(MAGIC, magic)) {
+        throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
+      }
+      long footerIndex = footerLengthIndex - footerLength;
+      if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
+      if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
+        throw new RuntimeException("corrupted file: the footer index is not within the file");
+      }
+      f.seek(footerIndex);
+      return converter.readParquetMetadata(f, filter);
+    } finally {
+      f.close();
+    }
+  }
+
+  private final CodecFactory codecFactory;
+  private final List<BlockMetaData> blocks;
+  private final FSDataInputStream f;
+  private final Path filePath;
+  private int currentBlock = 0;
+  private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
+
+  /**
+   * @param f the Parquet file (will be opened for read in this constructor)
+   * @param blocks the blocks to read
+   * @param colums the columns to read (their path)
+   * @param codecClassName the codec used to compress the blocks
+   * @throws IOException if the file can not be opened
+   */
+  public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+    this.filePath = filePath;
+    FileSystem fs = filePath.getFileSystem(configuration);
+    this.f = fs.open(filePath);
+    this.blocks = blocks;
+    for (ColumnDescriptor col : columns) {
+      paths.put(ColumnPath.get(col.getPath()), col);
+    }
+    this.codecFactory = new CodecFactory(configuration);
+  }
+
+  /**
+   * Reads all the columns requested from the row group at the current file position.
+   * @throws IOException if an error occurs while reading
+   * @return the PageReadStore which can provide PageReaders for each column.
+   */
+  public PageReadStore readNextRowGroup() throws IOException {
+    if (currentBlock == blocks.size()) {
+      return null;
+    }
+    BlockMetaData block = blocks.get(currentBlock);
+    if (block.getRowCount() == 0) {
+      throw new RuntimeException("Illegal row group of 0 rows");
+    }
+    ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount());
+    // prepare the list of consecutive chunks to read them in one scan
+    List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
+    ConsecutiveChunkList currentChunks = null;
+    for (ColumnChunkMetaData mc : block.getColumns()) {
+      ColumnPath pathKey = mc.getPath();
+      BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
+      ColumnDescriptor columnDescriptor = paths.get(pathKey);
+      if (columnDescriptor != null) {
+        long startingPos = mc.getStartingPos();
+        // first chunk or not consecutive => new list
+        if (currentChunks == null || currentChunks.endPos() != startingPos) {
+          currentChunks = new ConsecutiveChunkList(startingPos);
+          allChunks.add(currentChunks);
+        }
+        currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
+      }
+    }
+    // actually read all the chunks
+    for (ConsecutiveChunkList consecutiveChunks : allChunks) {
+      final List<Chunk> chunks = consecutiveChunks.readAll(f);
+      for (Chunk chunk : chunks) {
+        columnChunkPageReadStore.addColumn(chunk.descriptor.col, chunk.readAllPages());
+      }
+    }
+    ++currentBlock;
+    return columnChunkPageReadStore;
+  }
+
+
+
+  @Override
+  public void close() throws IOException {
+    f.close();
+    this.codecFactory.release();
+  }
+
+  /**
+   * The data for a column chunk
+   *
+   * @author Julien Le Dem
+   *
+   */
+  private class Chunk extends ByteArrayInputStream {
+
+    private final ChunkDescriptor descriptor;
+
+    /**
+     *
+     * @param descriptor descriptor for the chunk
+     * @param data contains the chunk data at offset
+     * @param offset where the chunk starts in offset
+     */
+    public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) {
+      super(data);
+      this.descriptor = descriptor;
+      this.pos = offset;
+    }
+
+    protected PageHeader readPageHeader() throws IOException {
+      return Util.readPageHeader(this);
+    }
+
+    /**
+     * Read all of the pages in a given column chunk.
+     * @return the list of pages
+     */
+    public ColumnChunkPageReader readAllPages() throws IOException {
+      List<DataPage> pagesInChunk = new ArrayList<DataPage>();
+      DictionaryPage dictionaryPage = null;
+      long valuesCountReadSoFar = 0;
+      while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
+        PageHeader pageHeader = readPageHeader();
+        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+        int compressedPageSize = pageHeader.getCompressed_page_size();
+        switch (pageHeader.type) {
+          case DICTIONARY_PAGE:
+            // there is only one dictionary page per column chunk
+            if (dictionaryPage != null) {
+              throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
+            }
+          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+          dictionaryPage =
+                new DictionaryPage(
+                    this.readAsBytesInput(compressedPageSize),
+                    uncompressedPageSize,
+                    dicHeader.getNum_values(),
+                    converter.getEncoding(dicHeader.getEncoding())
+                    );
+            break;
+          case DATA_PAGE:
+            DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+            pagesInChunk.add(
+                new DataPageV1(
+                    this.readAsBytesInput(compressedPageSize),
+                    dataHeaderV1.getNum_values(),
+                    uncompressedPageSize,
+                    fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
+                    converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+                    converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+                    converter.getEncoding(dataHeaderV1.getEncoding())
+                    ));
+            valuesCountReadSoFar += dataHeaderV1.getNum_values();
+            break;
+          case DATA_PAGE_V2:
+            DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
+            int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
+            pagesInChunk.add(
+                new DataPageV2(
+                    dataHeaderV2.getNum_rows(),
+                    dataHeaderV2.getNum_nulls(),
+                    dataHeaderV2.getNum_values(),
+                    this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
+                    this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
+                    converter.getEncoding(dataHeaderV2.getEncoding()),
+                    this.readAsBytesInput(dataSize),
+                    uncompressedPageSize,
+                    fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
+                    dataHeaderV2.isIs_compressed()
+                    ));
+            valuesCountReadSoFar += dataHeaderV2.getNum_values();
+            break;
+          default:
+            if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
+            this.skip(compressedPageSize);
+            break;
+        }
+      }
+      if (valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
+        // Would be nice to have a CorruptParquetFileException or something as a subclass?
+        throw new IOException(
+            "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
+            filePath + " offset " + descriptor.metadata.getFirstDataPageOffset() +
+            " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+            + " pages ending at file offset " + (descriptor.fileOffset + pos()));
+      }
+      BytesDecompressor decompressor = codecFactory.getDecompressor(descriptor.metadata.getCodec());
+      return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
+    }
+
+    /**
+     * @return the current position in the chunk
+     */
+    public int pos() {
+      return this.pos;
+    }
+
+    /**
+     * @param size the size of the page
+     * @return the page
+     * @throws IOException
+     */
+    public BytesInput readAsBytesInput(int size) throws IOException {
+      final BytesInput r = BytesInput.from(this.buf, this.pos, size);
+      this.pos += size;
+      return r;
+    }
+
+  }
+
+  /**
+   * deals with a now fixed bug where compressedLength was missing a few bytes.
+   *
+   * @author Julien Le Dem
+   *
+   */
+  private class WorkaroundChunk extends Chunk {
+
+    private final FSDataInputStream f;
+
+    /**
+     * @param descriptor the descriptor of the chunk
+     * @param data contains the data of the chunk at offset
+     * @param offset where the chunk starts in data
+     * @param f the file stream positioned at the end of this chunk
+     */
+    private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) {
+      super(descriptor, data, offset);
+      this.f = f;
+    }
+
+    protected PageHeader readPageHeader() throws IOException {
+      PageHeader pageHeader;
+      int initialPos = this.pos;
+      try {
+        pageHeader = Util.readPageHeader(this);
+      } catch (IOException e) {
+        // this is to workaround a bug where the compressedLength
+        // of the chunk is missing the size of the header of the dictionary
+        // to allow reading older files (using dictionary) we need this.
+        // usually 13 to 19 bytes are missing
+        // if the last page is smaller than this, the page header itself is truncated in the buffer.
+        this.pos = initialPos; // resetting the buffer to the position before we got the error
+        LOG.info("completing the column chunk to read the page header");
+        pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
+      }
+      return pageHeader;
+    }
+
+    public BytesInput readAsBytesInput(int size) throws IOException {
+      if (pos + size > count) {
+        // this is to workaround a bug where the compressedLength
+        // of the chunk is missing the size of the header of the dictionary
+        // to allow reading older files (using dictionary) we need this.
+        // usually 13 to 19 bytes are missing
+        int l1 = count - pos;
+        int l2 = size - l1;
+        LOG.info("completed the column chunk with " + l2 + " bytes");
+        return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
+      }
+      return super.readAsBytesInput(size);
+    }
+
+  }
+
+
+  /**
+   * information needed to read a column chunk
+   */
+  private static class ChunkDescriptor {
+
+    private final ColumnDescriptor col;
+    private final ColumnChunkMetaData metadata;
+    private final long fileOffset;
+    private final int size;
+
+    /**
+     * @param col column this chunk is part of
+     * @param metadata metadata for the column
+     * @param fileOffset offset in the file where this chunk starts
+     * @param size size of the chunk
+     */
+    private ChunkDescriptor(
+        ColumnDescriptor col,
+        ColumnChunkMetaData metadata,
+        long fileOffset,
+        int size) {
+      super();
+      this.col = col;
+      this.metadata = metadata;
+      this.fileOffset = fileOffset;
+      this.size = size;
+    }
+  }
+
+  /**
+   * describes a list of consecutive column chunks to be read at once.
+   *
+   * @author Julien Le Dem
+   */
+  private class ConsecutiveChunkList {
+
+    private final long offset;
+    private int length;
+    private final List<ChunkDescriptor> chunks = new ArrayList<ChunkDescriptor>();
+
+    /**
+     * @param offset where the first chunk starts
+     */
+    ConsecutiveChunkList(long offset) {
+      this.offset = offset;
+    }
+
+    /**
+     * adds a chunk to the list.
+     * It must be consecutive to the previous chunk
+     * @param descriptor
+     */
+    public void addChunk(ChunkDescriptor descriptor) {
+      chunks.add(descriptor);
+      length += descriptor.size;
+    }
+
+    /**
+     * @param f file to read the chunks from
+     * @return the chunks
+     * @throws IOException
+     */
+    public List<Chunk> readAll(FSDataInputStream f) throws IOException {
+      List<Chunk> result = new ArrayList<Chunk>(chunks.size());
+      f.seek(offset);
+      byte[] chunksBytes = new byte[length];
+      f.readFully(chunksBytes);
+      // report in a counter the data we just scanned
+      BenchmarkCounter.incrementBytesRead(length);
+      int currentChunkOffset = 0;
+      for (int i = 0; i < chunks.size(); i++) {
+        ChunkDescriptor descriptor = chunks.get(i);
+        if (i < chunks.size() - 1) {
+          result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset));
+        } else {
+          // because of a bug, the last chunk might be larger than descriptor.size
+          result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f));
+        }
+        currentChunkOffset += descriptor.size;
+      }
+      return result ;
+    }
+
+    /**
+     * @return the position following the last byte of these chunks
+     */
+    public long endPos() {
+      return offset + length;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
new file mode 100644
index 0000000..7c034b7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -0,0 +1,553 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.format.Util.writeFileMetaData;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.Version;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.GlobalMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Internal implementation of the Parquet file writer as a block container
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetFileWriter {
+  private static final Log LOG = Log.getLog(ParquetFileWriter.class);
+
+  public static final String PARQUET_METADATA_FILE = "_metadata";
+  public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
+  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+  public static final int CURRENT_VERSION = 1;
+
+  // File creation modes
+  public static enum Mode {
+    CREATE,
+    OVERWRITE
+  }
+
+  private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+
+  private final MessageType schema;
+  private final FSDataOutputStream out;
+  private BlockMetaData currentBlock;
+  private ColumnChunkMetaData currentColumn;
+  private long currentRecordCount;
+  private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+  private long uncompressedLength;
+  private long compressedLength;
+  private Set<org.apache.parquet.column.Encoding> currentEncodings;
+
+  private CompressionCodecName currentChunkCodec;
+  private ColumnPath currentChunkPath;
+  private PrimitiveTypeName currentChunkType;
+  private long currentChunkFirstDataPage;
+  private long currentChunkDictionaryPageOffset;
+  private long currentChunkValueCount;
+
+  private Statistics currentStatistics;
+
+  /**
+   * Captures the order in which methods should be called
+   *
+   * @author Julien Le Dem
+   *
+   */
+  private enum STATE {
+    NOT_STARTED {
+      STATE start() {
+        return STARTED;
+      }
+    },
+    STARTED {
+      STATE startBlock() {
+        return BLOCK;
+      }
+      STATE end() {
+        return ENDED;
+      }
+    },
+    BLOCK  {
+      STATE startColumn() {
+        return COLUMN;
+      }
+      STATE endBlock() {
+        return STARTED;
+      }
+    },
+    COLUMN {
+      STATE endColumn() {
+        return BLOCK;
+      };
+      STATE write() {
+        return this;
+      }
+    },
+    ENDED;
+
+    STATE start() throws IOException { return error(); }
+    STATE startBlock() throws IOException { return error(); }
+    STATE startColumn() throws IOException { return error(); }
+    STATE write() throws IOException { return error(); }
+    STATE endColumn() throws IOException { return error(); }
+    STATE endBlock() throws IOException { return error(); }
+    STATE end() throws IOException { return error(); }
+
+    private final STATE error() throws IOException {
+      throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
+    }
+  }
+
+  private STATE state = STATE.NOT_STARTED;
+
+  /**
+   * @param configuration Hadoop configuration
+   * @param schema the schema of the data
+   * @param file the file to write to
+   * @throws IOException if the file can not be created
+   */
+  public ParquetFileWriter(Configuration configuration, MessageType schema,
+      Path file) throws IOException {
+    this(configuration, schema, file, Mode.CREATE);
+  }
+
+  /**
+   * @param configuration Hadoop configuration
+   * @param schema the schema of the data
+   * @param file the file to write to
+   * @param mode file creation mode
+   * @throws IOException if the file can not be created
+   */
+  public ParquetFileWriter(Configuration configuration, MessageType schema,
+      Path file, Mode mode) throws IOException {
+    super();
+    this.schema = schema;
+    FileSystem fs = file.getFileSystem(configuration);
+    boolean overwriteFlag = (mode == Mode.OVERWRITE);
+    this.out = fs.create(file, overwriteFlag);
+  }
+
+  /**
+   * start the file
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    state = state.start();
+    if (DEBUG) LOG.debug(out.getPos() + ": start");
+    out.write(MAGIC);
+  }
+
+  /**
+   * start a block
+   * @param recordCount the record count in this block
+   * @throws IOException
+   */
+  public void startBlock(long recordCount) throws IOException {
+    state = state.startBlock();
+    if (DEBUG) LOG.debug(out.getPos() + ": start block");
+//    out.write(MAGIC); // TODO: add a magic delimiter
+    currentBlock = new BlockMetaData();
+    currentRecordCount = recordCount;
+  }
+
+  /**
+   * start a column inside a block
+   * @param descriptor the column descriptor
+   * @param valueCount the value count in this column
+   * @param statistics the statistics in this column
+   * @param compressionCodecName
+   * @throws IOException
+   */
+  public void startColumn(ColumnDescriptor descriptor,
+                          long valueCount,
+                          CompressionCodecName compressionCodecName) throws IOException {
+    state = state.startColumn();
+    if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
+    currentEncodings = new HashSet<org.apache.parquet.column.Encoding>();
+    currentChunkPath = ColumnPath.get(descriptor.getPath());
+    currentChunkType = descriptor.getType();
+    currentChunkCodec = compressionCodecName;
+    currentChunkValueCount = valueCount;
+    currentChunkFirstDataPage = out.getPos();
+    compressedLength = 0;
+    uncompressedLength = 0;
+    // need to know what type of stats to initialize to
+    // better way to do this?
+    currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
+  }
+
+  /**
+   * writes a dictionary page page
+   * @param dictionaryPage the dictionary page
+   */
+  public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+    state = state.write();
+    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
+    currentChunkDictionaryPageOffset = out.getPos();
+    int uncompressedSize = dictionaryPage.getUncompressedSize();
+    int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
+    metadataConverter.writeDictionaryPageHeader(
+        uncompressedSize,
+        compressedPageSize,
+        dictionaryPage.getDictionarySize(),
+        dictionaryPage.getEncoding(),
+        out);
+    long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+    this.uncompressedLength += uncompressedSize + headerSize;
+    this.compressedLength += compressedPageSize + headerSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
+    dictionaryPage.getBytes().writeAllTo(out);
+    currentEncodings.add(dictionaryPage.getEncoding());
+  }
+
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   */
+  @Deprecated
+  public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding) throws IOException {
+    state = state.write();
+    long beforeHeader = out.getPos();
+    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
+    int compressedPageSize = (int)bytes.size();
+    metadataConverter.writeDataPageHeader(
+        uncompressedPageSize, compressedPageSize,
+        valueCount,
+        rlEncoding,
+        dlEncoding,
+        valuesEncoding,
+        out);
+    long headerSize = out.getPos() - beforeHeader;
+    this.uncompressedLength += uncompressedPageSize + headerSize;
+    this.compressedLength += compressedPageSize + headerSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
+    bytes.writeAllTo(out);
+    currentEncodings.add(rlEncoding);
+    currentEncodings.add(dlEncoding);
+    currentEncodings.add(valuesEncoding);
+  }
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   */
+  public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics statistics,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding) throws IOException {
+    state = state.write();
+    long beforeHeader = out.getPos();
+    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
+    int compressedPageSize = (int)bytes.size();
+    metadataConverter.writeDataPageHeader(
+        uncompressedPageSize, compressedPageSize,
+        valueCount,
+        statistics,
+        rlEncoding,
+        dlEncoding,
+        valuesEncoding,
+        out);
+    long headerSize = out.getPos() - beforeHeader;
+    this.uncompressedLength += uncompressedPageSize + headerSize;
+    this.compressedLength += compressedPageSize + headerSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
+    bytes.writeAllTo(out);
+    currentStatistics.mergeStatistics(statistics);
+    currentEncodings.add(rlEncoding);
+    currentEncodings.add(dlEncoding);
+    currentEncodings.add(valuesEncoding);
+  }
+
+  /**
+   * writes a number of pages at once
+   * @param bytes bytes to be written including page headers
+   * @param uncompressedTotalPageSize total uncompressed size (without page headers)
+   * @param compressedTotalPageSize total compressed size (without page headers)
+   * @throws IOException
+   */
+   void writeDataPages(BytesInput bytes,
+                       long uncompressedTotalPageSize,
+                       long compressedTotalPageSize,
+                       Statistics totalStats,
+                       List<org.apache.parquet.column.Encoding> encodings) throws IOException {
+    state = state.write();
+    if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
+    long headersSize = bytes.size() - compressedTotalPageSize;
+    this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+    this.compressedLength += compressedTotalPageSize + headersSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
+    bytes.writeAllTo(out);
+    currentEncodings.addAll(encodings);
+    currentStatistics = totalStats;
+  }
+
+  /**
+   * end a column (once all rep, def and data have been written)
+   * @throws IOException
+   */
+  public void endColumn() throws IOException {
+    state = state.endColumn();
+    if (DEBUG) LOG.debug(out.getPos() + ": end column");
+    currentBlock.addColumn(ColumnChunkMetaData.get(
+        currentChunkPath,
+        currentChunkType,
+        currentChunkCodec,
+        currentEncodings,
+        currentStatistics,
+        currentChunkFirstDataPage,
+        currentChunkDictionaryPageOffset,
+        currentChunkValueCount,
+        compressedLength,
+        uncompressedLength));
+    if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
+    currentColumn = null;
+    this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
+    this.uncompressedLength = 0;
+    this.compressedLength = 0;
+  }
+
+  /**
+   * ends a block once all column chunks have been written
+   * @throws IOException
+   */
+  public void endBlock() throws IOException {
+    state = state.endBlock();
+    if (DEBUG) LOG.debug(out.getPos() + ": end block");
+    currentBlock.setRowCount(currentRecordCount);
+    blocks.add(currentBlock);
+    currentBlock = null;
+  }
+
+  /**
+   * ends a file once all blocks have been written.
+   * closes the file.
+   * @param extraMetaData the extra meta data to write in the footer
+   * @throws IOException
+   */
+  public void end(Map<String, String> extraMetaData) throws IOException {
+    state = state.end();
+    if (DEBUG) LOG.debug(out.getPos() + ": end");
+    ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
+    serializeFooter(footer, out);
+    out.close();
+  }
+
+  private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
+    long footerIndex = out.getPos();
+    org.apache.parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
+    writeFileMetaData(parquetMetadata, out);
+    if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
+    BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
+    out.write(MAGIC);
+  }
+
+  /**
+   * writes a _metadata and _common_metadata file
+   * @param configuration the configuration to use to get the FileSystem
+   * @param outputPath the directory to write the _metadata file to
+   * @param footers the list of footers to merge
+   * @throws IOException
+   */
+  public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
+    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
+    FileSystem fs = outputPath.getFileSystem(configuration);
+    outputPath = outputPath.makeQualified(fs);
+    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+    metadataFooter.getBlocks().clear();
+    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
+  }
+
+  private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
+      throws IOException {
+    Path metaDataPath = new Path(outputPath, parquetMetadataFile);
+    FSDataOutputStream metadata = fs.create(metaDataPath);
+    metadata.write(MAGIC);
+    serializeFooter(metadataFooter, metadata);
+    metadata.close();
+  }
+
+  static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+    String rootPath = root.toUri().getPath();
+    GlobalMetaData fileMetaData = null;
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+    for (Footer footer : footers) {
+        String footerPath = footer.getFile().toUri().getPath();
+      if (!footerPath.startsWith(rootPath)) {
+        throw new ParquetEncodingException(footerPath + " invalid: all the files must be contained in the root " + root);
+      }
+      footerPath = footerPath.substring(rootPath.length());
+      while (footerPath.startsWith("/")) {
+        footerPath = footerPath.substring(1);
+      }
+      fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
+      for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+        block.setPath(footerPath);
+        blocks.add(block);
+      }
+    }
+    return new ParquetMetadata(fileMetaData.merge(), blocks);
+  }
+
+  /**
+   * @return the current position in the underlying file
+   * @throws IOException
+   */
+  public long getPos() throws IOException {
+    return out.getPos();
+  }
+
+  /**
+   * Will merge the metadata of all the footers together
+   * @param footers the list files footers to merge
+   * @return the global meta data for all the footers
+   */
+  static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+    return getGlobalMetaData(footers, true);
+  }
+
+  static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
+    GlobalMetaData fileMetaData = null;
+    for (Footer footer : footers) {
+      ParquetMetadata currentMetadata = footer.getParquetMetadata();
+      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
+    }
+    return fileMetaData;
+  }
+
+  /**
+   * Will return the result of merging toMerge into mergedMetadata
+   * @param toMerge the metadata toMerge
+   * @param mergedMetadata the reference metadata to merge into
+   * @return the result of the merge
+   */
+  static GlobalMetaData mergeInto(
+      FileMetaData toMerge,
+      GlobalMetaData mergedMetadata) {
+    return mergeInto(toMerge, mergedMetadata, true);
+  }
+
+  static GlobalMetaData mergeInto(
+      FileMetaData toMerge,
+      GlobalMetaData mergedMetadata,
+      boolean strict) {
+    MessageType schema = null;
+    Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
+    Set<String> createdBy = new HashSet<String>();
+    if (mergedMetadata != null) {
+      schema = mergedMetadata.getSchema();
+      newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
+      createdBy.addAll(mergedMetadata.getCreatedBy());
+    }
+    if ((schema == null && toMerge.getSchema() != null)
+        || (schema != null && !schema.equals(toMerge.getSchema()))) {
+      schema = mergeInto(toMerge.getSchema(), schema, strict);
+    }
+    for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
+      Set<String> values = newKeyValues.get(entry.getKey());
+      if (values == null) {
+        values = new HashSet<String>();
+        newKeyValues.put(entry.getKey(), values);
+      }
+      values.add(entry.getValue());
+    }
+    createdBy.add(toMerge.getCreatedBy());
+    return new GlobalMetaData(
+        schema,
+        newKeyValues,
+        createdBy);
+  }
+
+  /**
+   * will return the result of merging toMerge into mergedSchema
+   * @param toMerge the schema to merge into mergedSchema
+   * @param mergedSchema the schema to append the fields to
+   * @return the resulting schema
+   */
+  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+    return mergeInto(toMerge, mergedSchema, true);
+  }
+
+  /**
+   * will return the result of merging toMerge into mergedSchema
+   * @param toMerge the schema to merge into mergedSchema
+   * @param mergedSchema the schema to append the fields to
+   * @param strict should schema primitive types match
+   * @return the resulting schema
+   */
+  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
+    if (mergedSchema == null) {
+      return toMerge;
+    }
+
+    return mergedSchema.union(toMerge, strict);
+  }
+
+}


Mime
View raw message