parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [09/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:06 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
deleted file mode 100644
index 144515c..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import parquet.Ints;
-import parquet.Log;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ParquetProperties;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreV1;
-import parquet.column.impl.ColumnWriteStoreV2;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.MessageType;
-
-class InternalParquetRecordWriter<T> {
-  private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
-
-  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
-  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
-  private final ParquetFileWriter parquetFileWriter;
-  private final WriteSupport<T> writeSupport;
-  private final MessageType schema;
-  private final Map<String, String> extraMetaData;
-  private final long rowGroupSize;
-  private long rowGroupSizeThreshold;
-  private final int pageSize;
-  private final BytesCompressor compressor;
-  private final boolean validating;
-  private final ParquetProperties parquetProperties;
-
-  private long recordCount = 0;
-  private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-
-  private ColumnWriteStore columnStore;
-  private ColumnChunkPageWriteStore pageStore;
-
-
-  /**
-   * @param parquetFileWriter the file to write to
-   * @param writeSupport the class to convert incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to write in the footer of the file
-   * @param rowGroupSize the size of a block in the file (this will be approximate)
-   * @param compressor the codec used to compress
-   */
-  public InternalParquetRecordWriter(
-      ParquetFileWriter parquetFileWriter,
-      WriteSupport<T> writeSupport,
-      MessageType schema,
-      Map<String, String> extraMetaData,
-      long rowGroupSize,
-      int pageSize,
-      BytesCompressor compressor,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      WriterVersion writerVersion) {
-    this.parquetFileWriter = parquetFileWriter;
-    this.writeSupport = checkNotNull(writeSupport, "writeSupport");
-    this.schema = schema;
-    this.extraMetaData = extraMetaData;
-    this.rowGroupSize = rowGroupSize;
-    this.rowGroupSizeThreshold = rowGroupSize;
-    this.pageSize = pageSize;
-    this.compressor = compressor;
-    this.validating = validating;
-    this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
-    initStore();
-  }
-
-  private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
-    columnStore = parquetProperties.newColumnWriteStore(
-        schema,
-        pageStore,
-        pageSize);
-    MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
-    writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
-  }
-
-  public void close() throws IOException, InterruptedException {
-    flushRowGroupToStore();
-    FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
-    Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
-    finalMetadata.putAll(finalWriteContext.getExtraMetaData());
-    parquetFileWriter.end(finalMetadata);
-  }
-
-  public void write(T value) throws IOException, InterruptedException {
-    writeSupport.write(value);
-    ++ recordCount;
-    checkBlockSizeReached();
-  }
-
-  private void checkBlockSizeReached() throws IOException {
-    if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
-      long memSize = columnStore.getBufferedSize();
-      if (memSize > rowGroupSizeThreshold) {
-        LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
-        flushRowGroupToStore();
-        initStore();
-        recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
-      } else {
-        float recordSize = (float) memSize / recordCount;
-        recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
-            recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
-            );
-        if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
-      }
-    }
-  }
-
-  private void flushRowGroupToStore()
-      throws IOException {
-    LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
-    if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
-      LOG.warn("Too much memory used: " + columnStore.memUsageString());
-    }
-
-    if (recordCount > 0) {
-      parquetFileWriter.startBlock(recordCount);
-      columnStore.flush();
-      pageStore.flushToFileWriter(parquetFileWriter);
-      recordCount = 0;
-      parquetFileWriter.endBlock();
-    }
-
-    columnStore = null;
-    pageStore = null;
-  }
-
-  long getRowGroupSizeThreshold() {
-    return rowGroupSizeThreshold;
-  }
-
-  void setRowGroupSizeThreshold(long rowGroupSizeThreshold) {
-    this.rowGroupSizeThreshold = rowGroupSizeThreshold;
-  }
-
-  MessageType getSchema() {
-    return this.schema;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java b/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
deleted file mode 100644
index 2d6070f..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import parquet.Log;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A basic implementation of an LRU cache.  Besides evicting the least recently
- * used entries (either based on insertion or access order), this class also
- * checks for "stale" entries as entries are inserted or retrieved (note
- * "staleness" is defined by the entries themselves (see
- * {@link parquet.hadoop.LruCache.Value}).
- *
- * @param <K> The key type. Acts as the key in a {@link java.util.LinkedHashMap}
- * @param <V> The value type.  Must extend {@link parquet.hadoop.LruCache.Value}
- *           so that the "staleness" of the value can be easily determined.
- */
-final class LruCache<K, V extends LruCache.Value<K, V>> {
-  private static final Log LOG = Log.getLog(LruCache.class);
-
-  private static final float DEFAULT_LOAD_FACTOR = 0.75f;
-
-  private final LinkedHashMap<K, V> cacheMap;
-
-  /**
-   * Constructs an access-order based LRU cache with {@code maxSize} entries.
-   * @param maxSize The maximum number of entries to store in the cache.
-   */
-  public LruCache(final int maxSize) {
-    this(maxSize, DEFAULT_LOAD_FACTOR, true);
-  }
-
-  /**
-   * Constructs an LRU cache.
-   *
-   * @param maxSize The maximum number of entries to store in the cache.
-   * @param loadFactor Used to determine the initial capacity.
-   * @param accessOrder the ordering mode - {@code true} for access-order,
-   * {@code false} for insertion-order
-   */
-  public LruCache(final int maxSize, final float loadFactor, final boolean accessOrder) {
-    int initialCapacity = Math.round(maxSize / loadFactor);
-    cacheMap =
-            new LinkedHashMap<K, V>(initialCapacity, loadFactor, accessOrder) {
-              @Override
-              public boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
-                boolean result = size() > maxSize;
-                if (result) {
-                  if (Log.DEBUG) {
-                    LOG.debug("Removing eldest entry in cache: "
-                            + eldest.getKey());
-                  }
-                }
-                return result;
-              }
-            };
-  }
-
-  /**
-   * Removes the mapping for the specified key from this cache if present.
-   * @param key key whose mapping is to be removed from the cache
-   * @return the previous value associated with key, or null if there was no
-   * mapping for key.
-   */
-  public V remove(final K key) {
-    V oldValue = cacheMap.remove(key);
-    if (oldValue != null) {
-      if (Log.DEBUG) {
-        LOG.debug("Removed cache entry for '" + key + "'");
-      }
-    }
-    return oldValue;
-  }
-
-  /**
-   * Associates the specified value with the specified key in this cache. The
-   * value is only inserted if it is not null and it is considered current. If
-   * the cache previously contained a mapping for the key, the old value is
-   * replaced only if the new value is "newer" than the old one.
-   * @param key key with which the specified value is to be associated
-   * @param newValue value to be associated with the specified key
-   */
-  public void put(final K key, final V newValue) {
-    if (newValue == null || !newValue.isCurrent(key)) {
-      if (Log.WARN) {
-        LOG.warn("Ignoring new cache entry for '" + key + "' because it is "
-                + (newValue == null ? "null" : "not current"));
-      }
-      return;
-    }
-
-    V oldValue = cacheMap.get(key);
-    if (oldValue != null && oldValue.isNewerThan(newValue)) {
-      if (Log.WARN) {
-        LOG.warn("Ignoring new cache entry for '" + key + "' because "
-                + "existing cache entry is newer");
-      }
-      return;
-    }
-
-    // no existing value or new value is newer than old value
-    oldValue = cacheMap.put(key, newValue);
-    if (Log.DEBUG) {
-      if (oldValue == null) {
-        LOG.debug("Added new cache entry for '" + key + "'");
-      } else {
-        LOG.debug("Overwrote existing cache entry for '" + key + "'");
-      }
-    }
-  }
-
-  /**
-   * Removes all of the mappings from this cache. The cache will be empty
-   * after this call returns.
-   */
-  public void clear() {
-    cacheMap.clear();
-  }
-
-  /**
-   * Returns the value to which the specified key is mapped, or null if 1) the
-   * value is not current or 2) this cache contains no mapping for the key.
-   * @param key the key whose associated value is to be returned
-   * @return the value to which the specified key is mapped, or null if 1) the
-   * value is not current or 2) this cache contains no mapping for the key
-   */
-  public V getCurrentValue(final K key) {
-    V value = cacheMap.get(key);
-    if (Log.DEBUG) {
-      LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "")
-              + "in cache");
-    }
-    if (value != null && !value.isCurrent(key)) {
-      // value is not current; remove it and return null
-      remove(key);
-      return null;
-    }
-
-    return value;
-  }
-
-  /**
-   * Returns the number of key-value mappings in this cache.
-   * @return the number of key-value mappings in this cache.
-   */
-  public int size() {
-    return cacheMap.size();
-  }
-
-  /**
-   * {@link parquet.hadoop.LruCache} expects all values to follow this
-   * interface so the cache can determine 1) whether values are current (e.g.
-   * the referenced data has not been modified/updated in such a way that the
-   * value is no longer useful) and 2) whether a value is strictly "newer"
-   * than another value.
-   *
-   * @param <K> The key type.
-   * @param <V> Provides a bound for the {@link #isNewerThan(V)} method
-   */
-  interface Value<K, V> {
-    /**
-     * Is the value still current (e.g. has the referenced data been
-     * modified/updated in such a way that the value is no longer useful)
-     * @param key the key associated with this value
-     * @return {@code true} the value is still current, {@code false} the value
-     * is no longer useful
-     */
-    boolean isCurrent(K key);
-
-    /**
-     * Compares this value with the specified value to check for relative age.
-     * @param otherValue the value to be compared.
-     * @return {@code true} the value is strictly newer than the other value,
-     * {@code false} the value is older or just
-     * as new as the other value.
-     */
-    boolean isNewerThan(V otherValue);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
deleted file mode 100644
index 9724868..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import parquet.Log;
-import parquet.ParquetRuntimeException;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implements a memory manager that keeps a global context of how many Parquet
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- *
- * This class balances the allocation size of each writer by resize them averagely.
- * When the sum of each writer's allocation size  is less than total memory pool,
- * keep them original value.
- * When the sum exceeds, decrease each writer's allocation size by a ratio.
- */
-public class MemoryManager {
-  private static final Log LOG = Log.getLog(MemoryManager.class);
-  static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
-  static final long DEFAULT_MIN_MEMORY_ALLOCATION = 1 * 1024 * 1024; // 1MB
-  private final float memoryPoolRatio;
-
-  private final long totalMemoryPool;
-  private final long minMemoryAllocation;
-  private final Map<InternalParquetRecordWriter, Long> writerList = new
-      HashMap<InternalParquetRecordWriter, Long>();
-
-  public MemoryManager(float ratio, long minAllocation) {
-    checkRatio(ratio);
-
-    memoryPoolRatio = ratio;
-    minMemoryAllocation = minAllocation;
-    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
-        () * ratio);
-    LOG.debug(String.format("Allocated total memory pool is: %,d", totalMemoryPool));
-  }
-
-  private void checkRatio(float ratio) {
-    if (ratio <= 0 || ratio > 1) {
-      throw new IllegalArgumentException("The configured memory pool ratio " + ratio + " is " +
-          "not between 0 and 1.");
-    }
-  }
-
-  /**
-   * Add a new writer and its memory allocation to the memory manager.
-   * @param writer the new created writer
-   * @param allocation the requested buffer size
-   */
-  synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
-    Long oldValue = writerList.get(writer);
-    if (oldValue == null) {
-      writerList.put(writer, allocation);
-    } else {
-      throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
-          "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
-          "the writer: " + writer);
-    }
-    updateAllocation();
-  }
-
-  /**
-   * Remove the given writer from the memory manager.
-   * @param writer the writer that has been closed
-   */
-  synchronized void removeWriter(InternalParquetRecordWriter writer) {
-    if (writerList.containsKey(writer)) {
-      writerList.remove(writer);
-    }
-    if (!writerList.isEmpty()) {
-      updateAllocation();
-    }
-  }
-
-  /**
-   * Update the allocated size of each writer based on the current allocations and pool size.
-   */
-  private void updateAllocation() {
-    long totalAllocations = 0;
-    double scale;
-    for (Long allocation : writerList.values()) {
-      totalAllocations += allocation;
-    }
-    if (totalAllocations <= totalMemoryPool) {
-      scale = 1.0;
-    } else {
-      scale = (double) totalMemoryPool / totalAllocations;
-      LOG.warn(String.format(
-          "Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" +
-          "Scaling row group sizes to %.2f%% for %d writers",
-          100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
-    }
-
-    int maxColCount = 0;
-    for (InternalParquetRecordWriter w : writerList.keySet()) {
-      maxColCount = Math.max(w.getSchema().getColumns().size(), maxColCount);
-    }
-
-    for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
-      long newSize = (long) Math.floor(entry.getValue() * scale);
-      if(scale < 1.0 && minMemoryAllocation > 0 && newSize < minMemoryAllocation) {
-          throw new ParquetRuntimeException(String.format("New Memory allocation %d bytes" +
-          " is smaller than the minimum allocation size of %d bytes.",
-              newSize, minMemoryAllocation)){};
-      }
-      entry.getKey().setRowGroupSizeThreshold(newSize);
-      LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
-            entry.getValue(), newSize, entry.getKey()));
-    }
-  }
-
-  /**
-   * Get the total memory pool size that is available for writers.
-   * @return the number of bytes in the memory pool
-   */
-  long getTotalMemoryPool() {
-    return totalMemoryPool;
-  }
-
-  /**
-   * Get the writers list
-   * @return the writers in this memory manager
-   */
-  Map<InternalParquetRecordWriter, Long> getWriterList() {
-    return writerList;
-  }
-
-  /**
-   * Get the ratio of memory allocated for all the writers.
-   * @return the memory pool ratio
-   */
-  float getMemoryPoolRatio() {
-    return memoryPoolRatio;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
deleted file mode 100644
index 5b0b341..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ /dev/null
@@ -1,782 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
-import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.SequenceInputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReadStore;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.format.DataPageHeader;
-import parquet.format.DataPageHeaderV2;
-import parquet.format.DictionaryPageHeader;
-import parquet.format.PageHeader;
-import parquet.format.Util;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
-import parquet.hadoop.CodecFactory.BytesDecompressor;
-import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.hadoop.util.HiddenFileFilter;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ParquetDecodingException;
-
-/**
- * Internal implementation of the Parquet file reader as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileReader implements Closeable {
-
-  private static final Log LOG = Log.getLog(ParquetFileReader.class);
-
-  public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";
-
-  private static ParquetMetadataConverter converter = new ParquetMetadataConverter();
-
-  /**
-   * for files provided, check if there's a summary file.
-   * If a summary file is found it is used otherwise the file footer is used.
-   * @param configuration the hadoop conf to connect to the file system;
-   * @param partFiles the part files to read
-   * @return the footers for those files using the summary file if possible.
-   * @throws IOException
-   */
-  @Deprecated
-  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
-    return readAllFootersInParallelUsingSummaryFiles(configuration, partFiles, false);
-  }
-
-  private static MetadataFilter filter(boolean skipRowGroups) {
-    return skipRowGroups ? SKIP_ROW_GROUPS : NO_FILTER;
-  }
-
-  /**
-   * for files provided, check if there's a summary file.
-   * If a summary file is found it is used otherwise the file footer is used.
-   * @param configuration the hadoop conf to connect to the file system;
-   * @param partFiles the part files to read
-   * @param skipRowGroups to skipRowGroups in the footers
-   * @return the footers for those files using the summary file if possible.
-   * @throws IOException
-   */
-  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
-      final Configuration configuration,
-      final Collection<FileStatus> partFiles,
-      final boolean skipRowGroups) throws IOException {
-
-    // figure out list of all parents to part files
-    Set<Path> parents = new HashSet<Path>();
-    for (FileStatus part : partFiles) {
-      parents.add(part.getPath().getParent());
-    }
-
-    // read corresponding summary files if they exist
-    List<Callable<Map<Path, Footer>>> summaries = new ArrayList<Callable<Map<Path, Footer>>>();
-    for (final Path path : parents) {
-      summaries.add(new Callable<Map<Path, Footer>>() {
-        @Override
-        public Map<Path, Footer> call() throws Exception {
-          ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
-          if (mergedMetadata != null) {
-            final List<Footer> footers;
-            if (skipRowGroups) {
-              footers = new ArrayList<Footer>();
-              for (FileStatus f : partFiles) {
-                footers.add(new Footer(f.getPath(), mergedMetadata));
-              }
-            } else {
-              footers = footersFromSummaryFile(path, mergedMetadata);
-            }
-            Map<Path, Footer> map = new HashMap<Path, Footer>();
-            for (Footer footer : footers) {
-              // the folder may have been moved
-              footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
-              map.put(footer.getFile(), footer);
-            }
-            return map;
-          } else {
-            return Collections.emptyMap();
-          }
-        }
-      });
-    }
-
-    Map<Path, Footer> cache = new HashMap<Path, Footer>();
-    try {
-      List<Map<Path, Footer>> footersFromSummaries = runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), summaries);
-      for (Map<Path, Footer> footers : footersFromSummaries) {
-        cache.putAll(footers);
-      }
-    } catch (ExecutionException e) {
-      throw new IOException("Error reading summaries", e);
-    }
-
-    // keep only footers for files actually requested and read file footer if not found in summaries
-    List<Footer> result = new ArrayList<Footer>(partFiles.size());
-    List<FileStatus> toRead = new ArrayList<FileStatus>();
-    for (FileStatus part : partFiles) {
-      Footer f = cache.get(part.getPath());
-      if (f != null) {
-        result.add(f);
-      } else {
-        toRead.add(part);
-      }
-    }
-
-    if (toRead.size() > 0) {
-      // read the footers of the files that did not have a summary file
-      if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
-      result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
-    }
-
-    return result;
-  }
-
-  private static <T> List<T> runAllInParallel(int parallelism, List<Callable<T>> toRun) throws ExecutionException {
-    LOG.info("Initiating action with parallelism: " + parallelism);
-    ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
-    try {
-      List<Future<T>> futures = new ArrayList<Future<T>>();
-      for (Callable<T> callable : toRun) {
-        futures.add(threadPool.submit(callable));
-      }
-      List<T> result = new ArrayList<T>(toRun.size());
-      for (Future<T> future : futures) {
-        try {
-          result.add(future.get());
-        } catch (InterruptedException e) {
-          throw new RuntimeException("The thread was interrupted", e);
-        }
-      }
-      return result;
-    } finally {
-      threadPool.shutdownNow();
-    }
-  }
-
-  @Deprecated
-  public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
-    return readAllFootersInParallel(configuration, partFiles, false);
-  }
-
-  /**
-   * read all the footers of the files provided
-   * (not using summary files)
-   * @param configuration the conf to access the File System
-   * @param partFiles the files to read
-   * @param skipRowGroups to skip the rowGroup info
-   * @return the footers
-   * @throws IOException
-   */
-  public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
-    List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
-    for (final FileStatus currentFile : partFiles) {
-      footers.add(new Callable<Footer>() {
-        @Override
-        public Footer call() throws Exception {
-          try {
-            return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
-          } catch (IOException e) {
-            throw new IOException("Could not read footer for file " + currentFile, e);
-          }
-        }
-      });
-    }
-    try {
-      return runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), footers);
-    } catch (ExecutionException e) {
-      throw new IOException("Could not read footer: " + e.getMessage(), e.getCause());
-    }
-  }
-
-  /**
-   * Read the footers of all the files under that path (recursively)
-   * not using summary files.
-   * rowGroups are not skipped
-   * @param configuration the configuration to access the FS
-   * @param fileStatus the root dir
-   * @return all the footers
-   * @throws IOException
-   */
-  public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
-    List<FileStatus> statuses = listFiles(configuration, fileStatus);
-    return readAllFootersInParallel(configuration, statuses, false);
-  }
-
-  @Deprecated
-  public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
-    return readFooters(configuration, status(configuration, path));
-  }
-
-  private static FileStatus status(Configuration configuration, Path path) throws IOException {
-    return path.getFileSystem(configuration).getFileStatus(path);
-  }
-
-  /**
-   * this always returns the row groups
-   * @param configuration
-   * @param pathStatus
-   * @return
-   * @throws IOException
-   */
-  @Deprecated
-  public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
-    return readFooters(configuration, pathStatus, false);
-  }
-
-  /**
-   * Read the footers of all the files under that path (recursively)
-   * using summary files if possible
-   * @param configuration the configuration to access the FS
-   * @param fileStatus the root dir
-   * @return all the footers
-   * @throws IOException
-   */
-  public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
-    List<FileStatus> files = listFiles(configuration, pathStatus);
-    return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
-  }
-
-  private static List<FileStatus> listFiles(Configuration conf, FileStatus fileStatus) throws IOException {
-    if (fileStatus.isDir()) {
-      FileSystem fs = fileStatus.getPath().getFileSystem(conf);
-      FileStatus[] list = fs.listStatus(fileStatus.getPath(), HiddenFileFilter.INSTANCE);
-      List<FileStatus> result = new ArrayList<FileStatus>();
-      for (FileStatus sub : list) {
-        result.addAll(listFiles(conf, sub));
-      }
-      return result;
-    } else {
-      return Arrays.asList(fileStatus);
-    }
-  }
-
-  /**
-   * Specifically reads a given summary file
-   * @param configuration
-   * @param summaryStatus
-   * @return the metadata translated for each file
-   * @throws IOException
-   */
-  public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
-    final Path parent = summaryStatus.getPath().getParent();
-    ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
-    return footersFromSummaryFile(parent, mergedFooters);
-  }
-
-  static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
-    Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
-    Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
-    FileSystem fileSystem = basePath.getFileSystem(configuration);
-    if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
-      // reading the summary file that does not contain the row groups
-      if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
-      return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
-    } else if (fileSystem.exists(metadataFile)) {
-      if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
-      return readFooter(configuration, metadataFile, filter(skipRowGroups));
-    } else {
-      return null;
-    }
-  }
-
-  static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
-    Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
-    List<BlockMetaData> blocks = mergedFooters.getBlocks();
-    for (BlockMetaData block : blocks) {
-      String path = block.getPath();
-      Path fullPath = new Path(parent, path);
-      ParquetMetadata current = footers.get(fullPath);
-      if (current == null) {
-        current = new ParquetMetadata(mergedFooters.getFileMetaData(), new ArrayList<BlockMetaData>());
-        footers.put(fullPath, current);
-      }
-      current.getBlocks().add(block);
-    }
-    List<Footer> result = new ArrayList<Footer>();
-    for (Entry<Path, ParquetMetadata> entry : footers.entrySet()) {
-      result.add(new Footer(entry.getKey(), entry.getValue()));
-    }
-    return result;
-  }
-
-  /**
-   * Reads the meta data block in the footer of the file
-   * @param configuration
-   * @param file the parquet File
-   * @return the metadata blocks in the footer
-   * @throws IOException if an error occurs while reading the file
-   */
-  @Deprecated
-  public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
-    return readFooter(configuration, file, NO_FILTER);
-  }
-
-  /**
-   * Reads the meta data in the footer of the file.
-   * Skipping row groups (or not) based on the provided filter
-   * @param configuration
-   * @param file the Parquet File
-   * @param filter the filter to apply to row groups
-   * @return the metadata with row groups filtered.
-   * @throws IOException  if an error occurs while reading the file
-   */
-  public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
-    FileSystem fileSystem = file.getFileSystem(configuration);
-    return readFooter(configuration, fileSystem.getFileStatus(file), filter);
-  }
-
-  /**
-   * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
-   */
-  @Deprecated
-  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
-    return readFooter(configuration, file, NO_FILTER);
-  }
-
-  /**
-   * Reads the meta data block in the footer of the file
-   * @param configuration
-   * @param file the parquet File
-   * @param filter the filter to apply to row groups
-   * @return the metadata blocks in the footer
-   * @throws IOException if an error occurs while reading the file
-   */
-  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
-    FileSystem fileSystem = file.getPath().getFileSystem(configuration);
-    FSDataInputStream f = fileSystem.open(file.getPath());
-    try {
-      long l = file.getLen();
-      if (Log.DEBUG) LOG.debug("File length " + l);
-      int FOOTER_LENGTH_SIZE = 4;
-      if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
-        throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
-      }
-      long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
-      if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex);
-
-      f.seek(footerLengthIndex);
-      int footerLength = readIntLittleEndian(f);
-      byte[] magic = new byte[MAGIC.length];
-      f.readFully(magic);
-      if (!Arrays.equals(MAGIC, magic)) {
-        throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
-      }
-      long footerIndex = footerLengthIndex - footerLength;
-      if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
-      if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
-        throw new RuntimeException("corrupted file: the footer index is not within the file");
-      }
-      f.seek(footerIndex);
-      return converter.readParquetMetadata(f, filter);
-    } finally {
-      f.close();
-    }
-  }
-
-  private final CodecFactory codecFactory;
-  private final List<BlockMetaData> blocks;
-  private final FSDataInputStream f;
-  private final Path filePath;
-  private int currentBlock = 0;
-  private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
-
-  /**
-   * @param f the Parquet file (will be opened for read in this constructor)
-   * @param blocks the blocks to read
-   * @param colums the columns to read (their path)
-   * @param codecClassName the codec used to compress the blocks
-   * @throws IOException if the file can not be opened
-   */
-  public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
-    this.filePath = filePath;
-    FileSystem fs = filePath.getFileSystem(configuration);
-    this.f = fs.open(filePath);
-    this.blocks = blocks;
-    for (ColumnDescriptor col : columns) {
-      paths.put(ColumnPath.get(col.getPath()), col);
-    }
-    this.codecFactory = new CodecFactory(configuration);
-  }
-
-  /**
-   * Reads all the columns requested from the row group at the current file position.
-   * @throws IOException if an error occurs while reading
-   * @return the PageReadStore which can provide PageReaders for each column.
-   */
-  public PageReadStore readNextRowGroup() throws IOException {
-    if (currentBlock == blocks.size()) {
-      return null;
-    }
-    BlockMetaData block = blocks.get(currentBlock);
-    if (block.getRowCount() == 0) {
-      throw new RuntimeException("Illegal row group of 0 rows");
-    }
-    ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount());
-    // prepare the list of consecutive chunks to read them in one scan
-    List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
-    ConsecutiveChunkList currentChunks = null;
-    for (ColumnChunkMetaData mc : block.getColumns()) {
-      ColumnPath pathKey = mc.getPath();
-      BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
-      ColumnDescriptor columnDescriptor = paths.get(pathKey);
-      if (columnDescriptor != null) {
-        long startingPos = mc.getStartingPos();
-        // first chunk or not consecutive => new list
-        if (currentChunks == null || currentChunks.endPos() != startingPos) {
-          currentChunks = new ConsecutiveChunkList(startingPos);
-          allChunks.add(currentChunks);
-        }
-        currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
-      }
-    }
-    // actually read all the chunks
-    for (ConsecutiveChunkList consecutiveChunks : allChunks) {
-      final List<Chunk> chunks = consecutiveChunks.readAll(f);
-      for (Chunk chunk : chunks) {
-        columnChunkPageReadStore.addColumn(chunk.descriptor.col, chunk.readAllPages());
-      }
-    }
-    ++currentBlock;
-    return columnChunkPageReadStore;
-  }
-
-
-
-  @Override
-  public void close() throws IOException {
-    f.close();
-    this.codecFactory.release();
-  }
-
-  /**
-   * The data for a column chunk
-   *
-   * @author Julien Le Dem
-   *
-   */
-  private class Chunk extends ByteArrayInputStream {
-
-    private final ChunkDescriptor descriptor;
-
-    /**
-     *
-     * @param descriptor descriptor for the chunk
-     * @param data contains the chunk data at offset
-     * @param offset where the chunk starts in offset
-     */
-    public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) {
-      super(data);
-      this.descriptor = descriptor;
-      this.pos = offset;
-    }
-
-    protected PageHeader readPageHeader() throws IOException {
-      return Util.readPageHeader(this);
-    }
-
-    /**
-     * Read all of the pages in a given column chunk.
-     * @return the list of pages
-     */
-    public ColumnChunkPageReader readAllPages() throws IOException {
-      List<DataPage> pagesInChunk = new ArrayList<DataPage>();
-      DictionaryPage dictionaryPage = null;
-      long valuesCountReadSoFar = 0;
-      while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
-        PageHeader pageHeader = readPageHeader();
-        int uncompressedPageSize = pageHeader.getUncompressed_page_size();
-        int compressedPageSize = pageHeader.getCompressed_page_size();
-        switch (pageHeader.type) {
-          case DICTIONARY_PAGE:
-            // there is only one dictionary page per column chunk
-            if (dictionaryPage != null) {
-              throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
-            }
-          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
-          dictionaryPage =
-                new DictionaryPage(
-                    this.readAsBytesInput(compressedPageSize),
-                    uncompressedPageSize,
-                    dicHeader.getNum_values(),
-                    converter.getEncoding(dicHeader.getEncoding())
-                    );
-            break;
-          case DATA_PAGE:
-            DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
-            pagesInChunk.add(
-                new DataPageV1(
-                    this.readAsBytesInput(compressedPageSize),
-                    dataHeaderV1.getNum_values(),
-                    uncompressedPageSize,
-                    fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
-                    converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
-                    converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
-                    converter.getEncoding(dataHeaderV1.getEncoding())
-                    ));
-            valuesCountReadSoFar += dataHeaderV1.getNum_values();
-            break;
-          case DATA_PAGE_V2:
-            DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
-            int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
-            pagesInChunk.add(
-                new DataPageV2(
-                    dataHeaderV2.getNum_rows(),
-                    dataHeaderV2.getNum_nulls(),
-                    dataHeaderV2.getNum_values(),
-                    this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
-                    this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
-                    converter.getEncoding(dataHeaderV2.getEncoding()),
-                    this.readAsBytesInput(dataSize),
-                    uncompressedPageSize,
-                    fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
-                    dataHeaderV2.isIs_compressed()
-                    ));
-            valuesCountReadSoFar += dataHeaderV2.getNum_values();
-            break;
-          default:
-            if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
-            this.skip(compressedPageSize);
-            break;
-        }
-      }
-      if (valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
-        // Would be nice to have a CorruptParquetFileException or something as a subclass?
-        throw new IOException(
-            "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
-            filePath + " offset " + descriptor.metadata.getFirstDataPageOffset() +
-            " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
-            + " pages ending at file offset " + (descriptor.fileOffset + pos()));
-      }
-      BytesDecompressor decompressor = codecFactory.getDecompressor(descriptor.metadata.getCodec());
-      return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
-    }
-
-    /**
-     * @return the current position in the chunk
-     */
-    public int pos() {
-      return this.pos;
-    }
-
-    /**
-     * @param size the size of the page
-     * @return the page
-     * @throws IOException
-     */
-    public BytesInput readAsBytesInput(int size) throws IOException {
-      final BytesInput r = BytesInput.from(this.buf, this.pos, size);
-      this.pos += size;
-      return r;
-    }
-
-  }
-
-  /**
-   * deals with a now fixed bug where compressedLength was missing a few bytes.
-   *
-   * @author Julien Le Dem
-   *
-   */
-  private class WorkaroundChunk extends Chunk {
-
-    private final FSDataInputStream f;
-
-    /**
-     * @param descriptor the descriptor of the chunk
-     * @param data contains the data of the chunk at offset
-     * @param offset where the chunk starts in data
-     * @param f the file stream positioned at the end of this chunk
-     */
-    private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) {
-      super(descriptor, data, offset);
-      this.f = f;
-    }
-
-    protected PageHeader readPageHeader() throws IOException {
-      PageHeader pageHeader;
-      int initialPos = this.pos;
-      try {
-        pageHeader = Util.readPageHeader(this);
-      } catch (IOException e) {
-        // this is to workaround a bug where the compressedLength
-        // of the chunk is missing the size of the header of the dictionary
-        // to allow reading older files (using dictionary) we need this.
-        // usually 13 to 19 bytes are missing
-        // if the last page is smaller than this, the page header itself is truncated in the buffer.
-        this.pos = initialPos; // resetting the buffer to the position before we got the error
-        LOG.info("completing the column chunk to read the page header");
-        pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
-      }
-      return pageHeader;
-    }
-
-    public BytesInput readAsBytesInput(int size) throws IOException {
-      if (pos + size > count) {
-        // this is to workaround a bug where the compressedLength
-        // of the chunk is missing the size of the header of the dictionary
-        // to allow reading older files (using dictionary) we need this.
-        // usually 13 to 19 bytes are missing
-        int l1 = count - pos;
-        int l2 = size - l1;
-        LOG.info("completed the column chunk with " + l2 + " bytes");
-        return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
-      }
-      return super.readAsBytesInput(size);
-    }
-
-  }
-
-
-  /**
-   * information needed to read a column chunk
-   */
-  private static class ChunkDescriptor {
-
-    private final ColumnDescriptor col;
-    private final ColumnChunkMetaData metadata;
-    private final long fileOffset;
-    private final int size;
-
-    /**
-     * @param col column this chunk is part of
-     * @param metadata metadata for the column
-     * @param fileOffset offset in the file where this chunk starts
-     * @param size size of the chunk
-     */
-    private ChunkDescriptor(
-        ColumnDescriptor col,
-        ColumnChunkMetaData metadata,
-        long fileOffset,
-        int size) {
-      super();
-      this.col = col;
-      this.metadata = metadata;
-      this.fileOffset = fileOffset;
-      this.size = size;
-    }
-  }
-
-  /**
-   * describes a list of consecutive column chunks to be read at once.
-   *
-   * @author Julien Le Dem
-   */
-  private class ConsecutiveChunkList {
-
-    private final long offset;
-    private int length;
-    private final List<ChunkDescriptor> chunks = new ArrayList<ChunkDescriptor>();
-
-    /**
-     * @param offset where the first chunk starts
-     */
-    ConsecutiveChunkList(long offset) {
-      this.offset = offset;
-    }
-
-    /**
-     * adds a chunk to the list.
-     * It must be consecutive to the previous chunk
-     * @param descriptor
-     */
-    public void addChunk(ChunkDescriptor descriptor) {
-      chunks.add(descriptor);
-      length += descriptor.size;
-    }
-
-    /**
-     * @param f file to read the chunks from
-     * @return the chunks
-     * @throws IOException
-     */
-    public List<Chunk> readAll(FSDataInputStream f) throws IOException {
-      List<Chunk> result = new ArrayList<Chunk>(chunks.size());
-      f.seek(offset);
-      byte[] chunksBytes = new byte[length];
-      f.readFully(chunksBytes);
-      // report in a counter the data we just scanned
-      BenchmarkCounter.incrementBytesRead(length);
-      int currentChunkOffset = 0;
-      for (int i = 0; i < chunks.size(); i++) {
-        ChunkDescriptor descriptor = chunks.get(i);
-        if (i < chunks.size() - 1) {
-          result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset));
-        } else {
-          // because of a bug, the last chunk might be larger than descriptor.size
-          result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f));
-        }
-        currentChunkOffset += descriptor.size;
-      }
-      return result ;
-    }
-
-    /**
-     * @return the position following the last byte of these chunks
-     */
-    public long endPos() {
-      return offset + length;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
deleted file mode 100644
index 6868717..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ /dev/null
@@ -1,553 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.Version;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DictionaryPage;
-import parquet.column.statistics.Statistics;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.FileMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Internal implementation of the Parquet file writer as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileWriter {
-  private static final Log LOG = Log.getLog(ParquetFileWriter.class);
-
-  public static final String PARQUET_METADATA_FILE = "_metadata";
-  public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
-  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
-  public static final int CURRENT_VERSION = 1;
-
-  // File creation modes
-  public static enum Mode {
-    CREATE,
-    OVERWRITE
-  }
-
-  private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-
-  private final MessageType schema;
-  private final FSDataOutputStream out;
-  private BlockMetaData currentBlock;
-  private ColumnChunkMetaData currentColumn;
-  private long currentRecordCount;
-  private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-  private long uncompressedLength;
-  private long compressedLength;
-  private Set<parquet.column.Encoding> currentEncodings;
-
-  private CompressionCodecName currentChunkCodec;
-  private ColumnPath currentChunkPath;
-  private PrimitiveTypeName currentChunkType;
-  private long currentChunkFirstDataPage;
-  private long currentChunkDictionaryPageOffset;
-  private long currentChunkValueCount;
-
-  private Statistics currentStatistics;
-
-  /**
-   * Captures the order in which methods should be called
-   *
-   * @author Julien Le Dem
-   *
-   */
-  private enum STATE {
-    NOT_STARTED {
-      STATE start() {
-        return STARTED;
-      }
-    },
-    STARTED {
-      STATE startBlock() {
-        return BLOCK;
-      }
-      STATE end() {
-        return ENDED;
-      }
-    },
-    BLOCK  {
-      STATE startColumn() {
-        return COLUMN;
-      }
-      STATE endBlock() {
-        return STARTED;
-      }
-    },
-    COLUMN {
-      STATE endColumn() {
-        return BLOCK;
-      };
-      STATE write() {
-        return this;
-      }
-    },
-    ENDED;
-
-    STATE start() throws IOException { return error(); }
-    STATE startBlock() throws IOException { return error(); }
-    STATE startColumn() throws IOException { return error(); }
-    STATE write() throws IOException { return error(); }
-    STATE endColumn() throws IOException { return error(); }
-    STATE endBlock() throws IOException { return error(); }
-    STATE end() throws IOException { return error(); }
-
-    private final STATE error() throws IOException {
-      throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
-    }
-  }
-
-  private STATE state = STATE.NOT_STARTED;
-
-  /**
-   * @param configuration Hadoop configuration
-   * @param schema the schema of the data
-   * @param file the file to write to
-   * @throws IOException if the file can not be created
-   */
-  public ParquetFileWriter(Configuration configuration, MessageType schema,
-      Path file) throws IOException {
-    this(configuration, schema, file, Mode.CREATE);
-  }
-
-  /**
-   * @param configuration Hadoop configuration
-   * @param schema the schema of the data
-   * @param file the file to write to
-   * @param mode file creation mode
-   * @throws IOException if the file can not be created
-   */
-  public ParquetFileWriter(Configuration configuration, MessageType schema,
-      Path file, Mode mode) throws IOException {
-    super();
-    this.schema = schema;
-    FileSystem fs = file.getFileSystem(configuration);
-    boolean overwriteFlag = (mode == Mode.OVERWRITE);
-    this.out = fs.create(file, overwriteFlag);
-  }
-
-  /**
-   * start the file
-   * @throws IOException
-   */
-  public void start() throws IOException {
-    state = state.start();
-    if (DEBUG) LOG.debug(out.getPos() + ": start");
-    out.write(MAGIC);
-  }
-
-  /**
-   * start a block
-   * @param recordCount the record count in this block
-   * @throws IOException
-   */
-  public void startBlock(long recordCount) throws IOException {
-    state = state.startBlock();
-    if (DEBUG) LOG.debug(out.getPos() + ": start block");
-//    out.write(MAGIC); // TODO: add a magic delimiter
-    currentBlock = new BlockMetaData();
-    currentRecordCount = recordCount;
-  }
-
-  /**
-   * start a column inside a block
-   * @param descriptor the column descriptor
-   * @param valueCount the value count in this column
-   * @param statistics the statistics in this column
-   * @param compressionCodecName
-   * @throws IOException
-   */
-  public void startColumn(ColumnDescriptor descriptor,
-                          long valueCount,
-                          CompressionCodecName compressionCodecName) throws IOException {
-    state = state.startColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
-    currentEncodings = new HashSet<parquet.column.Encoding>();
-    currentChunkPath = ColumnPath.get(descriptor.getPath());
-    currentChunkType = descriptor.getType();
-    currentChunkCodec = compressionCodecName;
-    currentChunkValueCount = valueCount;
-    currentChunkFirstDataPage = out.getPos();
-    compressedLength = 0;
-    uncompressedLength = 0;
-    // need to know what type of stats to initialize to
-    // better way to do this?
-    currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
-  }
-
-  /**
-   * writes a dictionary page page
-   * @param dictionaryPage the dictionary page
-   */
-  public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
-    state = state.write();
-    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
-    currentChunkDictionaryPageOffset = out.getPos();
-    int uncompressedSize = dictionaryPage.getUncompressedSize();
-    int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
-    metadataConverter.writeDictionaryPageHeader(
-        uncompressedSize,
-        compressedPageSize,
-        dictionaryPage.getDictionarySize(),
-        dictionaryPage.getEncoding(),
-        out);
-    long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
-    this.uncompressedLength += uncompressedSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
-    dictionaryPage.getBytes().writeAllTo(out);
-    currentEncodings.add(dictionaryPage.getEncoding());
-  }
-
-
-  /**
-   * writes a single page
-   * @param valueCount count of values
-   * @param uncompressedPageSize the size of the data once uncompressed
-   * @param bytes the compressed data for the page without header
-   * @param rlEncoding encoding of the repetition level
-   * @param dlEncoding encoding of the definition level
-   * @param valuesEncoding encoding of values
-   */
-  @Deprecated
-  public void writeDataPage(
-      int valueCount, int uncompressedPageSize,
-      BytesInput bytes,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
-    int compressedPageSize = (int)bytes.size();
-    metadataConverter.writeDataPageHeader(
-        uncompressedPageSize, compressedPageSize,
-        valueCount,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        out);
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
-    bytes.writeAllTo(out);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
-  }
-
-  /**
-   * writes a single page
-   * @param valueCount count of values
-   * @param uncompressedPageSize the size of the data once uncompressed
-   * @param bytes the compressed data for the page without header
-   * @param rlEncoding encoding of the repetition level
-   * @param dlEncoding encoding of the definition level
-   * @param valuesEncoding encoding of values
-   */
-  public void writeDataPage(
-      int valueCount, int uncompressedPageSize,
-      BytesInput bytes,
-      Statistics statistics,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
-    int compressedPageSize = (int)bytes.size();
-    metadataConverter.writeDataPageHeader(
-        uncompressedPageSize, compressedPageSize,
-        valueCount,
-        statistics,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        out);
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
-    bytes.writeAllTo(out);
-    currentStatistics.mergeStatistics(statistics);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
-  }
-
-  /**
-   * writes a number of pages at once
-   * @param bytes bytes to be written including page headers
-   * @param uncompressedTotalPageSize total uncompressed size (without page headers)
-   * @param compressedTotalPageSize total compressed size (without page headers)
-   * @throws IOException
-   */
-   void writeDataPages(BytesInput bytes,
-                       long uncompressedTotalPageSize,
-                       long compressedTotalPageSize,
-                       Statistics totalStats,
-                       List<parquet.column.Encoding> encodings) throws IOException {
-    state = state.write();
-    if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
-    long headersSize = bytes.size() - compressedTotalPageSize;
-    this.uncompressedLength += uncompressedTotalPageSize + headersSize;
-    this.compressedLength += compressedTotalPageSize + headersSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
-    bytes.writeAllTo(out);
-    currentEncodings.addAll(encodings);
-    currentStatistics = totalStats;
-  }
-
-  /**
-   * end a column (once all rep, def and data have been written)
-   * @throws IOException
-   */
-  public void endColumn() throws IOException {
-    state = state.endColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": end column");
-    currentBlock.addColumn(ColumnChunkMetaData.get(
-        currentChunkPath,
-        currentChunkType,
-        currentChunkCodec,
-        currentEncodings,
-        currentStatistics,
-        currentChunkFirstDataPage,
-        currentChunkDictionaryPageOffset,
-        currentChunkValueCount,
-        compressedLength,
-        uncompressedLength));
-    if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
-    currentColumn = null;
-    this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
-    this.uncompressedLength = 0;
-    this.compressedLength = 0;
-  }
-
-  /**
-   * ends a block once all column chunks have been written
-   * @throws IOException
-   */
-  public void endBlock() throws IOException {
-    state = state.endBlock();
-    if (DEBUG) LOG.debug(out.getPos() + ": end block");
-    currentBlock.setRowCount(currentRecordCount);
-    blocks.add(currentBlock);
-    currentBlock = null;
-  }
-
-  /**
-   * ends a file once all blocks have been written.
-   * closes the file.
-   * @param extraMetaData the extra meta data to write in the footer
-   * @throws IOException
-   */
-  public void end(Map<String, String> extraMetaData) throws IOException {
-    state = state.end();
-    if (DEBUG) LOG.debug(out.getPos() + ": end");
-    ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
-    serializeFooter(footer, out);
-    out.close();
-  }
-
-  private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
-    long footerIndex = out.getPos();
-    parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
-    writeFileMetaData(parquetMetadata, out);
-    if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
-    BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
-    out.write(MAGIC);
-  }
-
-  /**
-   * writes a _metadata and _common_metadata file
-   * @param configuration the configuration to use to get the FileSystem
-   * @param outputPath the directory to write the _metadata file to
-   * @param footers the list of footers to merge
-   * @throws IOException
-   */
-  public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
-    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
-    FileSystem fs = outputPath.getFileSystem(configuration);
-    outputPath = outputPath.makeQualified(fs);
-    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
-    metadataFooter.getBlocks().clear();
-    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
-  }
-
-  private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
-      throws IOException {
-    Path metaDataPath = new Path(outputPath, parquetMetadataFile);
-    FSDataOutputStream metadata = fs.create(metaDataPath);
-    metadata.write(MAGIC);
-    serializeFooter(metadataFooter, metadata);
-    metadata.close();
-  }
-
-  static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
-    String rootPath = root.toUri().getPath();
-    GlobalMetaData fileMetaData = null;
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-    for (Footer footer : footers) {
-        String footerPath = footer.getFile().toUri().getPath();
-      if (!footerPath.startsWith(rootPath)) {
-        throw new ParquetEncodingException(footerPath + " invalid: all the files must be contained in the root " + root);
-      }
-      footerPath = footerPath.substring(rootPath.length());
-      while (footerPath.startsWith("/")) {
-        footerPath = footerPath.substring(1);
-      }
-      fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
-      for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
-        block.setPath(footerPath);
-        blocks.add(block);
-      }
-    }
-    return new ParquetMetadata(fileMetaData.merge(), blocks);
-  }
-
-  /**
-   * @return the current position in the underlying file
-   * @throws IOException
-   */
-  public long getPos() throws IOException {
-    return out.getPos();
-  }
-
-  /**
-   * Will merge the metadata of all the footers together
-   * @param footers the list files footers to merge
-   * @return the global meta data for all the footers
-   */
-  static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
-    return getGlobalMetaData(footers, true);
-  }
-
-  static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
-    GlobalMetaData fileMetaData = null;
-    for (Footer footer : footers) {
-      ParquetMetadata currentMetadata = footer.getParquetMetadata();
-      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
-    }
-    return fileMetaData;
-  }
-
-  /**
-   * Will return the result of merging toMerge into mergedMetadata
-   * @param toMerge the metadata toMerge
-   * @param mergedMetadata the reference metadata to merge into
-   * @return the result of the merge
-   */
-  static GlobalMetaData mergeInto(
-      FileMetaData toMerge,
-      GlobalMetaData mergedMetadata) {
-    return mergeInto(toMerge, mergedMetadata, true);
-  }
-
-  static GlobalMetaData mergeInto(
-      FileMetaData toMerge,
-      GlobalMetaData mergedMetadata,
-      boolean strict) {
-    MessageType schema = null;
-    Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
-    Set<String> createdBy = new HashSet<String>();
-    if (mergedMetadata != null) {
-      schema = mergedMetadata.getSchema();
-      newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
-      createdBy.addAll(mergedMetadata.getCreatedBy());
-    }
-    if ((schema == null && toMerge.getSchema() != null)
-        || (schema != null && !schema.equals(toMerge.getSchema()))) {
-      schema = mergeInto(toMerge.getSchema(), schema, strict);
-    }
-    for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
-      Set<String> values = newKeyValues.get(entry.getKey());
-      if (values == null) {
-        values = new HashSet<String>();
-        newKeyValues.put(entry.getKey(), values);
-      }
-      values.add(entry.getValue());
-    }
-    createdBy.add(toMerge.getCreatedBy());
-    return new GlobalMetaData(
-        schema,
-        newKeyValues,
-        createdBy);
-  }
-
-  /**
-   * will return the result of merging toMerge into mergedSchema
-   * @param toMerge the schema to merge into mergedSchema
-   * @param mergedSchema the schema to append the fields to
-   * @return the resulting schema
-   */
-  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
-    return mergeInto(toMerge, mergedSchema, true);
-  }
-
-  /**
-   * will return the result of merging toMerge into mergedSchema
-   * @param toMerge the schema to merge into mergedSchema
-   * @param mergedSchema the schema to append the fields to
-   * @param strict should schema primitive types match
-   * @return the resulting schema
-   */
-  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
-    if (mergedSchema == null) {
-      return toMerge;
-    }
-
-    return mergedSchema.union(toMerge, strict);
-  }
-
-}


Mime
View raw message