parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [4/4] parquet-mr git commit: PARQUET-777: Add Parquet CLI.
Date Fri, 28 Jul 2017 23:25:29 GMT
PARQUET-777: Add Parquet CLI.

This adds a new parquet-cli module with an improved command-line tool. The parquet-cli/README.md file has instructions for building and testing locally.

Author: Ryan Blue <blue@apache.org>
Author: Tom White <tom@cloudera.com>

Closes #384 from rdblue/PARQUET-777-add-parquet-cli and squashes the following commits:

de49eff [Ryan Blue] PARQUET-777: Move dynamic support classes, add tests.
affdfb9 [Ryan Blue] PARQUET-777: Update for review feedback.
f953fd4 [Ryan Blue] PARQUET-777: Update README.md with better instructions.
aed223d [Tom White] Replace source file headers with Apache header.
d718363 [Ryan Blue] PARQUET-777: Add Parquet CLI.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/ddbeb4dd
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/ddbeb4dd
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/ddbeb4dd

Branch: refs/heads/master
Commit: ddbeb4dd17d9c219b99b1e66d8be28efe37e3aa6
Parents: df9f8d8
Author: Ryan Blue <blue@apache.org>
Authored: Fri Jul 28 16:25:21 2017 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Fri Jul 28 16:25:21 2017 -0700

----------------------------------------------------------------------
 NOTICE                                          |  38 ++
 parquet-cli/README.md                           | 107 ++++
 parquet-cli/pom.xml                             | 153 +++++
 .../org/apache/parquet/cli/BaseCommand.java     | 397 ++++++++++++
 .../java/org/apache/parquet/cli/Command.java    |  40 ++
 .../cli/HadoopFileSystemURLStreamHandler.java   |  79 +++
 .../main/java/org/apache/parquet/cli/Help.java  | 147 +++++
 .../main/java/org/apache/parquet/cli/Main.java  | 178 ++++++
 .../main/java/org/apache/parquet/cli/Util.java  | 335 ++++++++++
 .../parquet/cli/commands/CSVSchemaCommand.java  | 131 ++++
 .../apache/parquet/cli/commands/CatCommand.java | 106 ++++
 .../cli/commands/CheckParquet251Command.java    | 351 ++++++++++
 .../parquet/cli/commands/ConvertCSVCommand.java | 204 ++++++
 .../parquet/cli/commands/ConvertCommand.java    | 165 +++++
 .../cli/commands/ParquetMetadataCommand.java    | 180 ++++++
 .../parquet/cli/commands/SchemaCommand.java     | 138 ++++
 .../cli/commands/ShowDictionaryCommand.java     | 131 ++++
 .../parquet/cli/commands/ShowPagesCommand.java  | 217 +++++++
 .../parquet/cli/commands/ToAvroCommand.java     | 141 ++++
 .../org/apache/parquet/cli/csv/AvroCSV.java     | 258 ++++++++
 .../apache/parquet/cli/csv/AvroCSVReader.java   | 121 ++++
 .../apache/parquet/cli/csv/CSVProperties.java   | 111 ++++
 .../apache/parquet/cli/csv/RecordBuilder.java   | 200 ++++++
 .../org/apache/parquet/cli/json/AvroJson.java   | 636 +++++++++++++++++++
 .../apache/parquet/cli/json/AvroJsonReader.java |  85 +++
 .../org/apache/parquet/cli/util/Codecs.java     |  50 ++
 .../apache/parquet/cli/util/Expressions.java    | 391 ++++++++++++
 .../org/apache/parquet/cli/util/Formats.java    |  47 ++
 .../apache/parquet/cli/util/GetClassLoader.java |  39 ++
 .../parquet/cli/util/RecordException.java       |  53 ++
 .../parquet/cli/util/RuntimeIOException.java    |  31 +
 .../org/apache/parquet/cli/util/Schemas.java    | 498 +++++++++++++++
 .../cli/util/SeekableFSDataInputStream.java     |  76 +++
 parquet-cli/src/main/resources/META-INF/LICENSE | 348 ++++++++++
 parquet-cli/src/main/resources/META-INF/NOTICE  |  45 ++
 .../src/main/resources/cli-logging.properties   |  51 ++
 .../java/org/apache/parquet/Exceptions.java     |  34 +
 .../apache/parquet/util/DynConstructors.java    | 273 ++++++++
 .../org/apache/parquet/util/DynMethods.java     | 520 +++++++++++++++
 .../test/java/org/apache/parquet/TestUtils.java |  70 ++
 .../org/apache/parquet/util/Concatenator.java   |  82 +++
 .../parquet/util/TestDynConstructors.java       | 235 +++++++
 .../org/apache/parquet/util/TestDynMethods.java | 410 ++++++++++++
 pom.xml                                         |   7 +
 44 files changed, 7909 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index a9b6c56..289b092 100644
--- a/NOTICE
+++ b/NOTICE
@@ -54,3 +54,41 @@ its NOTICE file:
   This product includes software developed at
   The Apache Software Foundation (http://www.apache.org/).
 
+--------------------------------------------------------------------------------
+
+This project includes code from Kite, developed at Cloudera, Inc. with
+the following copyright notice:
+
+| Copyright 2013 Cloudera Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+|   http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+
+--------------------------------------------------------------------------------
+
+This project includes code from Netflix, Inc. with the following copyright
+notice:
+
+| Copyright 2016 Netflix, Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+|   http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/README.md
----------------------------------------------------------------------
diff --git a/parquet-cli/README.md b/parquet-cli/README.md
new file mode 100644
index 0000000..d17d719
--- /dev/null
+++ b/parquet-cli/README.md
@@ -0,0 +1,107 @@
+<!--
+  - Licensed to the Apache Software Foundation (ASF) under one
+  - or more contributor license agreements.  See the NOTICE file
+  - distributed with this work for additional information
+  - regarding copyright ownership.  The ASF licenses this file
+  - to you under the Apache License, Version 2.0 (the
+  - "License"); you may not use this file except in compliance
+  - with the License.  You may obtain a copy of the License at
+  -
+  -   http://www.apache.org/licenses/LICENSE-2.0
+  -
+  - Unless required by applicable law or agreed to in writing,
+  - software distributed under the License is distributed on an
+  - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  - KIND, either express or implied.  See the License for the
+  - specific language governing permissions and limitations
+  - under the License.
+  -->
+
+## Building
+
+You can build this project using maven:
+
+```
+mvn clean install -DskipTests
+```
+
+
+## Running
+
+The build produces a shaded Jar that can be run using the `hadoop` command:
+
+```
+hadoop jar parquet-cli-1.9.1-runtime.jar org.apache.parquet.cli.Main
+```
+
+For a shorter command-line invocation, add an alias to your shell like this:
+
+```
+alias parquet="hadoop jar /path/to/parquet-cli-1.9.1-runtime.jar org.apache.parquet.cli.Main --dollar-zero parquet"
+```
+
+### Running without Hadoop
+
+To run from the target directory instead of using the `hadoop` command, first copy the dependencies to a folder:
+
+```
+mvn dependency:copy-dependencies
+```
+
+Then, run the command-line and add `target/dependencies/*` to the classpath:
+
+```
+java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main
+```
+
+
+### Help
+
+The `parquet` tool includes help for the included commands:
+
+```
+parquet help
+```
+```
+Usage: parquet [options] [command] [command options]
+
+  Options:
+
+    -v, --verbose, --debug
+        Print extra debugging information
+
+  Commands:
+
+    help
+        Retrieves details on the functions of other commands
+    meta
+        Print a Parquet file's metadata
+    pages
+        Print page summaries for a Parquet file
+    dictionary
+        Print dictionaries for a Parquet column
+    check-stats
+        Check Parquet files for corrupt page and column stats (PARQUET-251)
+    schema
+        Print the Avro schema for a file
+    csv-schema
+        Build a schema from a CSV data sample
+    convert-csv
+        Create a file from CSV data
+    convert
+        Create a Parquet file from a data file
+    to-avro
+        Create an Avro file from a data file
+    cat
+        Print the first N records from a file
+    head
+        Print the first N records from a file
+
+  Examples:
+
+    # print information for create
+    parquet help create
+
+  See 'parquet help <command>' for more information on a specific command.
+```
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
new file mode 100644
index 0000000..a9cd21b
--- /dev/null
+++ b/parquet-cli/pom.xml
@@ -0,0 +1,153 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet</artifactId>
+    <relativePath>../pom.xml</relativePath>
+    <version>1.9.1-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>parquet-cli</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Parquet Command-line</name>
+  <url>https://parquet.apache.org</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.opencsv</groupId>
+      <artifactId>opencsv</artifactId>
+      <version>${opencsv.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+      <version>${jcommander.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>${commons-codec.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <!-- This module disables semver checks because it is not a public API.
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+      -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<shadedClassifierName>runtime</shadedClassifierName>
+              <minimizeJar>false</minimizeJar>
+              <filters>
+                <filter>
+                  <artifact>org.xerial.snappy:*</artifact>
+                  <excludes>
+                    <exclude>**/LICENSE</exclude>
+                  </excludes>
+                </filter>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/LICENSE.txt</exclude>
+                    <exclude>META-INF/NOTICE.txt</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <artifactSet>
+                <includes>
+                  <include>*</include>
+                </includes>
+              </artifactSet>
+              <relocations>
+                <relocation>
+									<!-- relocate Avro in the runtime jar to avoid conflicts with
+                       on-cluster Avro versions.
+											 -->
+                  <pattern>org.apache.avro</pattern>
+                  <shadedPattern>${shade.prefix}.org.apache.avro</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
new file mode 100644
index 0000000..4b47164
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Resources;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.cli.json.AvroJsonReader;
+import org.apache.parquet.cli.util.Formats;
+import org.apache.parquet.cli.util.GetClassLoader;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.parquet.cli.util.SeekableFSDataInputStream;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.slf4j.Logger;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.security.AccessController;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public abstract class BaseCommand implements Command, Configurable {
+
+  @VisibleForTesting
+  static final Charset UTF8 = Charset.forName("utf8");
+
+  private static final String RESOURCE_URI_SCHEME = "resource";
+  private static final String STDIN_AS_SOURCE = "stdin";
+
+  protected final Logger console;
+
+  private Configuration conf = null;
+  private LocalFileSystem localFS = null;
+
+  public BaseCommand(Logger console) {
+    this.console = console;
+  }
+
+  /**
+   * @return FileSystem to use when no file system scheme is present in a path
+   * @throws IOException
+   */
+  public FileSystem defaultFS() throws IOException {
+    if (localFS == null) {
+      this.localFS = FileSystem.getLocal(getConf());
+    }
+    return localFS;
+  }
+
+  /**
+   * Output content to the console or a file.
+   *
+   * This will not produce checksum files.
+   *
+   * @param content String content to write
+   * @param console A {@link Logger} for writing to the console
+   * @param filename The destination {@link Path} as a String
+   * @throws IOException
+   */
+  public void output(String content, Logger console, String filename)
+      throws IOException {
+    if (filename == null || "-".equals(filename)) {
+      console.info(content);
+    } else {
+      FSDataOutputStream outgoing = create(filename);
+      try {
+        outgoing.write(content.getBytes(UTF8));
+      } finally {
+        outgoing.close();
+      }
+    }
+  }
+
+  /**
+   * Creates a file and returns an open {@link FSDataOutputStream}.
+   *
+   * If the file does not have a file system scheme, this uses the default FS.
+   *
+   * This will not produce checksum files and will overwrite a file that
+   * already exists.
+   *
+   * @param filename The filename to create
+   * @return An open FSDataOutputStream
+   * @throws IOException
+   */
+  public FSDataOutputStream create(String filename) throws IOException {
+    return create(filename, true);
+  }
+
+  /**
+   * Creates a file and returns an open {@link FSDataOutputStream}.
+   *
+   * If the file does not have a file system scheme, this uses the default FS.
+   *
+   * This will produce checksum files and will overwrite a file that already
+   * exists.
+   *
+   * @param filename The filename to create
+   * @return An open FSDataOutputStream
+   * @throws IOException
+   */
+  public FSDataOutputStream createWithChecksum(String filename)
+      throws IOException {
+    return create(filename, false);
+  }
+
+  private FSDataOutputStream create(String filename, boolean noChecksum)
+      throws IOException {
+    Path filePath = qualifiedPath(filename);
+    // even though it was qualified using the default FS, it may not be in it
+    FileSystem fs = filePath.getFileSystem(getConf());
+    if (noChecksum && fs instanceof ChecksumFileSystem) {
+      fs = ((ChecksumFileSystem) fs).getRawFileSystem();
+    }
+    return fs.create(filePath, true /* overwrite */);
+  }
+
+  /**
+   * Returns a qualified {@link Path} for the {@code filename}.
+   *
+   * If the file does not have a file system scheme, this uses the default FS.
+   *
+   * @param filename The filename to qualify
+   * @return A qualified Path for the filename
+   * @throws IOException
+   */
+  public Path qualifiedPath(String filename) throws IOException {
+    Path cwd = defaultFS().makeQualified(new Path("."));
+    return new Path(filename).makeQualified(defaultFS().getUri(), cwd);
+  }
+
+  /**
+   * Returns a {@link URI} for the {@code filename} that is a qualified Path or
+   * a resource URI.
+   *
+   * If the file does not have a file system scheme, this uses the default FS.
+   *
+   * @param filename The filename to qualify
+   * @return A qualified URI for the filename
+   * @throws IOException
+   */
+  public URI qualifiedURI(String filename) throws IOException {
+    URI fileURI = URI.create(filename);
+    if (RESOURCE_URI_SCHEME.equals(fileURI.getScheme())) {
+      return fileURI;
+    } else {
+      return qualifiedPath(filename).toUri();
+    }
+  }
+
+  /**
+   * Opens an existing file or resource.
+   *
+   * If the file does not have a file system scheme, this uses the default FS.
+   *
+   * @param filename The filename to open.
+   * @return An open InputStream with the file contents
+   * @throws IOException
+   * @throws IllegalArgumentException If the file does not exist
+   */
+  public InputStream open(String filename) throws IOException {
+    if (STDIN_AS_SOURCE.equals(filename)) {
+      return System.in;
+    }
+
+    URI uri = qualifiedURI(filename);
+    if (RESOURCE_URI_SCHEME.equals(uri.getScheme())) {
+      return Resources.getResource(uri.getRawSchemeSpecificPart()).openStream();
+    } else {
+      Path filePath = new Path(uri);
+      // even though it was qualified using the default FS, it may not be in it
+      FileSystem fs = filePath.getFileSystem(getConf());
+      return fs.open(filePath);
+    }
+  }
+
+  public SeekableInput openSeekable(String filename) throws IOException {
+    Path path = qualifiedPath(filename);
+    // even though it was qualified using the default FS, it may not be in it
+    FileSystem fs = path.getFileSystem(getConf());
+    return new SeekableFSDataInputStream(fs, path);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    HadoopFileSystemURLStreamHandler.setDefaultConf(conf);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Returns a {@link ClassLoader} for a set of jars and directories.
+   *
+   * @param jars A list of jar paths
+   * @param paths A list of directories containing .class files
+   * @throws MalformedURLException
+   */
+  protected static ClassLoader loaderFor(List<String> jars, List<String> paths)
+      throws MalformedURLException {
+    return AccessController.doPrivileged(new GetClassLoader(urls(jars, paths)));
+  }
+
+  /**
+   * Returns a {@link ClassLoader} for a set of jars.
+   *
+   * @param jars A list of jar paths
+   * @throws MalformedURLException
+   */
+  protected static ClassLoader loaderForJars(List<String> jars)
+      throws MalformedURLException {
+    return AccessController.doPrivileged(new GetClassLoader(urls(jars, null)));
+  }
+
+  /**
+   * Returns a {@link ClassLoader} for a set of directories.
+   *
+   * @param paths A list of directories containing .class files
+   * @throws MalformedURLException
+   */
+  protected static ClassLoader loaderForPaths(List<String> paths)
+      throws MalformedURLException {
+    return AccessController.doPrivileged(new GetClassLoader(urls(null, paths)));
+  }
+
+  private static List<URL> urls(List<String> jars, List<String> dirs)
+      throws MalformedURLException {
+    // check the additional jars and lib directories in the local FS
+    final List<URL> urls = Lists.newArrayList();
+    if (dirs != null) {
+      for (String lib : dirs) {
+        // final URLs must end in '/' for URLClassLoader
+        File path = lib.endsWith("/") ? new File(lib) : new File(lib + "/");
+        Preconditions.checkArgument(path.exists(),
+            "Lib directory does not exist: " + lib);
+        Preconditions.checkArgument(path.isDirectory(),
+            "Not a directory: " + lib);
+        Preconditions.checkArgument(path.canRead() && path.canExecute(),
+            "Insufficient permissions to access lib directory: " + lib);
+        urls.add(path.toURI().toURL());
+      }
+    }
+    if (jars != null) {
+      for (String jar : jars) {
+        File path = new File(jar);
+        Preconditions.checkArgument(path.exists(),
+            "Jar files does not exist: " + jar);
+        Preconditions.checkArgument(path.isFile(),
+            "Not a file: " + jar);
+        Preconditions.checkArgument(path.canRead(),
+            "Cannot read jar file: " + jar);
+        urls.add(path.toURI().toURL());
+      }
+    }
+    return urls;
+  }
+
+  protected <D> Iterable<D> openDataFile(final String source, Schema projection)
+      throws IOException {
+    Formats.Format format = Formats.detectFormat(open(source));
+    switch (format) {
+      case PARQUET:
+        Configuration conf = new Configuration(getConf());
+        // TODO: add these to the reader builder
+        AvroReadSupport.setRequestedProjection(conf, projection);
+        AvroReadSupport.setAvroReadSchema(conf, projection);
+        final ParquetReader<D> parquet = AvroParquetReader.<D>builder(qualifiedPath(source))
+            .disableCompatibility()
+            .withDataModel(GenericData.get())
+            .withConf(conf)
+            .build();
+        return new Iterable<D>() {
+          @Override
+          public Iterator<D> iterator() {
+            return new Iterator<D>() {
+              private boolean hasNext = false;
+              private D next = advance();
+
+              @Override
+              public boolean hasNext() {
+                return hasNext;
+              }
+
+              @Override
+              public D next() {
+                if (!hasNext) {
+                  throw new NoSuchElementException();
+                }
+                D toReturn = next;
+                this.next = advance();
+                return toReturn;
+              }
+
+              private D advance() {
+                try {
+                  D next = parquet.read();
+                  this.hasNext = (next != null);
+                  return next;
+                } catch (IOException e) {
+                  throw new RuntimeException(
+                      "Failed while reading Parquet file: " + source, e);
+                }
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException("Remove is not supported");
+              }
+            };
+          }
+        };
+
+      case AVRO:
+        Iterable<D> avroReader = (Iterable<D>) DataFileReader.openReader(
+            openSeekable(source), new GenericDatumReader<>(projection));
+        return avroReader;
+
+      default:
+        if (source.endsWith("json")) {
+          return new AvroJsonReader<>(open(source), projection);
+        } else {
+          Preconditions.checkArgument(projection == null,
+              "Cannot select columns from text files");
+          Iterable text = CharStreams.readLines(new InputStreamReader(open(source)));
+          return text;
+        }
+    }
+  }
+
+  protected Schema getAvroSchema(String source) throws IOException {
+    Formats.Format format;
+    try (SeekableInput in = openSeekable(source)) {
+      format = Formats.detectFormat((InputStream) in);
+      in.seek(0);
+
+      switch (format) {
+        case PARQUET:
+          return Schemas.fromParquet(getConf(), qualifiedURI(source));
+        case AVRO:
+          return Schemas.fromAvro(open(source));
+        case TEXT:
+          if (source.endsWith("avsc")) {
+            return Schemas.fromAvsc(open(source));
+          } else if (source.endsWith("json")) {
+            return Schemas.fromJSON("json", open(source));
+          }
+        default:
+      }
+
+      throw new IllegalArgumentException(String.format(
+          "Could not determine file format of %s.", source));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java
new file mode 100644
index 0000000..9c19143
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface Command {
+  /**
+   * Runs this {@code Command}.
+   *
+   * @return a return code for the process, 0 indicates success.
+   * @throws IOException
+   */
+  int run() throws IOException;
+
+  /**
+   * Returns a list of example uses. Lines starting with '#' will not have the
+   * executable name added when formatting.
+   *
+   * @return a list of String examples
+   */
+  List<String> getExamples();
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java b/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java
new file mode 100644
index 0000000..548544a
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link URLStreamHandler} for handling Hadoop filesystem URLs,
+ * most commonly those with the <i>hdfs</i> scheme.
+ */
+public class HadoopFileSystemURLStreamHandler extends URLStreamHandler
+    implements Configurable {
+
+  private static Configuration defaultConf = new Configuration();
+
+  public static Configuration getDefaultConf() {
+    return defaultConf;
+  }
+
+  public static void setDefaultConf(Configuration defaultConf) {
+    HadoopFileSystemURLStreamHandler.defaultConf = defaultConf;
+  }
+
+  private Configuration conf = defaultConf;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  protected URLConnection openConnection(URL url) throws IOException {
+    return new HadoopFileSystemURLConnection(url);
+  }
+
+  class HadoopFileSystemURLConnection extends URLConnection {
+    public HadoopFileSystemURLConnection(URL url) {
+      super(url);
+    }
+    @Override
+    public void connect() throws IOException {
+    }
+    @Override
+    public InputStream getInputStream() throws IOException {
+      Path path = new Path(url.toExternalForm());
+      FileSystem fileSystem = path.getFileSystem(conf);
+      return fileSystem.open(path);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java
new file mode 100644
index 0000000..791d169
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterDescription;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import java.util.List;
+
+@Parameters(commandDescription = "Retrieves details on the functions of other commands")
+public class Help implements Command {
+  @Parameter(description = "<commands>")
+  List<String> helpCommands = Lists.newArrayList();
+
+  private final JCommander jc;
+  private final Logger console;
+  private String programName;
+
+  public Help(JCommander jc, Logger console) {
+    this.jc = jc;
+    this.console = console;
+  }
+
+  public void setProgramName(String programName) {
+    this.programName = programName;
+  }
+
+  @Override
+  public int run() {
+    if (helpCommands.isEmpty()) {
+      printGenericHelp();
+
+    } else {
+      for (String cmd : helpCommands) {
+        JCommander commander = jc.getCommands().get(cmd);
+        if (commander == null) {
+          console.error("\nUnknown command: {}\n", cmd);
+          printGenericHelp();
+          return 1;
+        }
+
+        boolean hasRequired = false;
+        console.info("\nUsage: {} [general options] {} {} [command options]",
+            new Object[] {
+                programName, cmd,
+                commander.getMainParameterDescription()});
+        console.info("\n  Description:");
+        console.info("\n    {}", jc.getCommandDescription(cmd));
+        if (!commander.getParameters().isEmpty()) {
+          console.info("\n  Command options:\n");
+          for (ParameterDescription param : commander.getParameters()) {
+            hasRequired = printOption(console, param) || hasRequired;
+          }
+          if (hasRequired) {
+            console.info("\n  * = required");
+          }
+        }
+        List<String> examples = ((Command) commander.getObjects().get(0)).getExamples();
+        if (examples != null) {
+          console.info("\n  Examples:");
+          for (String example : examples) {
+            if (example.startsWith("#")) {
+              // comment
+              console.info("\n    {}", example);
+            } else {
+              console.info("    {} {} {}",
+                  new Object[] {programName, cmd, example});
+            }
+          }
+        }
+        // add an extra newline in case there are more commands
+        console.info("");
+      }
+    }
+    return 0;
+  }
+
+  public void printGenericHelp() {
+    boolean hasRequired = false;
+    console.info(
+        "\nUsage: {} [options] [command] [command options]",
+        programName);
+    console.info("\n  Options:\n");
+    for (ParameterDescription param : jc.getParameters()) {
+      hasRequired = printOption(console, param) || hasRequired;
+    }
+    if (hasRequired) {
+      console.info("\n  * = required");
+    }
+    console.info("\n  Commands:\n");
+    for (String command : jc.getCommands().keySet()) {
+      console.info("    {}\n\t{}",
+          command, jc.getCommandDescription(command));
+    }
+    console.info("\n  Examples:");
+    console.info("\n    # print information for create\n    {} help create",
+        programName);
+    console.info("\n  See '{} help <command>' for more information on a " +
+        "specific command.", programName);
+  }
+
+  private boolean printOption(Logger console, ParameterDescription param) {
+    boolean required = param.getParameter().required();
+    if (!param.getParameter().hidden()) {
+      console.info("  {} {}\n\t{}{}", new Object[]{
+          required ? "*" : " ",
+          param.getNames().trim(),
+          param.getDescription(),
+          formatDefault(param)});
+    }
+    return required;
+  }
+
+  private String formatDefault(ParameterDescription param) {
+    Object defaultValue = param.getDefault();
+    if (defaultValue == null || param.getParameter().arity() < 1) {
+      return "";
+    }
+    return " (default: " + ((defaultValue instanceof String) ?
+        "\"" + defaultValue + "\"" :
+        defaultValue.toString()) + ")";
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
new file mode 100644
index 0000000..990193c
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.MissingCommandException;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.parquet.cli.commands.CSVSchemaCommand;
+import org.apache.parquet.cli.commands.CatCommand;
+import org.apache.parquet.cli.commands.CheckParquet251Command;
+import org.apache.parquet.cli.commands.ConvertCSVCommand;
+import org.apache.parquet.cli.commands.ConvertCommand;
+import org.apache.parquet.cli.commands.ParquetMetadataCommand;
+import org.apache.parquet.cli.commands.SchemaCommand;
+import org.apache.parquet.cli.commands.ShowDictionaryCommand;
+import org.apache.parquet.cli.commands.ShowPagesCommand;
+import org.apache.parquet.cli.commands.ToAvroCommand;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.apache.log4j.PropertyConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Set;
+
+@Parameters(commandDescription = "Parquet file utils")
+public class Main extends Configured implements Tool {
+
+  @Parameter(names = {"-v", "--verbose", "--debug"},
+      description = "Print extra debugging information")
+  private boolean debug = false;
+
+  @VisibleForTesting
+  @Parameter(names="--dollar-zero",
+      description="A way for the runtime path to be passed in", hidden=true)
+  String programName = DEFAULT_PROGRAM_NAME;
+
+  @VisibleForTesting
+  static final String DEFAULT_PROGRAM_NAME = "parquet";
+
+  private static Set<String> HELP_ARGS = ImmutableSet.of("-h", "-help", "--help", "help");
+
+  private final Logger console;
+  private final Help help;
+
+  @VisibleForTesting
+  final JCommander jc;
+
+  Main(Logger console) {
+    this.console = console;
+    this.jc = new JCommander(this);
+    this.help = new Help(jc, console);
+    jc.setProgramName(DEFAULT_PROGRAM_NAME);
+    jc.addCommand("help", help, "-h", "-help", "--help");
+    jc.addCommand("meta", new ParquetMetadataCommand(console));
+    jc.addCommand("pages", new ShowPagesCommand(console));
+    jc.addCommand("dictionary", new ShowDictionaryCommand(console));
+    jc.addCommand("check-stats", new CheckParquet251Command(console));
+    jc.addCommand("schema", new SchemaCommand(console));
+    jc.addCommand("csv-schema", new CSVSchemaCommand(console));
+    jc.addCommand("convert-csv", new ConvertCSVCommand(console));
+    jc.addCommand("convert", new ConvertCommand(console));
+    jc.addCommand("to-avro", new ToAvroCommand(console));
+    jc.addCommand("cat", new CatCommand(console, 0));
+    jc.addCommand("head", new CatCommand(console, 10));
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    try {
+      jc.parse(args);
+    } catch (MissingCommandException e) {
+      console.error(e.getMessage());
+      return 1;
+    } catch (ParameterException e) {
+      help.setProgramName(programName);
+      String cmd = jc.getParsedCommand();
+      if (args.length == 1) { // i.e., just the command (missing required arguments)
+        help.helpCommands.add(cmd);
+        help.run();
+        return 1;
+      } else { // check for variants like 'cmd --help' etc.
+        for (String arg : args) {
+          if (HELP_ARGS.contains(arg)) {
+            help.helpCommands.add(cmd);
+            help.run();
+            return 0;
+          }
+        }
+      }
+      console.error(e.getMessage());
+      return 1;
+    }
+
+    help.setProgramName(programName);
+
+    // configure log4j
+    if (debug) {
+      org.apache.log4j.Logger console = org.apache.log4j.Logger.getLogger(Main.class);
+      console.setLevel(Level.DEBUG);
+    }
+
+    String parsed = jc.getParsedCommand();
+    if (parsed == null) {
+      help.run();
+      return 1;
+    } else if ("help".equals(parsed)) {
+      return help.run();
+    }
+
+    Command command = (Command) jc.getCommands().get(parsed).getObjects().get(0);
+    if (command == null) {
+      help.run();
+      return 1;
+    }
+
+    try {
+      if (command instanceof Configurable) {
+        ((Configurable) command).setConf(getConf());
+      }
+      return command.run();
+    } catch (IllegalArgumentException e) {
+      if (debug) {
+        console.error("Argument error", e);
+      } else {
+        console.error("Argument error: {}", e.getMessage());
+      }
+      return 1;
+    } catch (IllegalStateException e) {
+      if (debug) {
+        console.error("State error", e);
+      } else {
+        console.error("State error: {}", e.getMessage());
+      }
+      return 1;
+    } catch (Exception e) {
+      console.error("Unknown error", e);
+      return 1;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    // reconfigure logging with the kite CLI configuration
+    PropertyConfigurator.configure(
+        Main.class.getResource("/cli-logging.properties"));
+    Logger console = LoggerFactory.getLogger(Main.class);
+    // use Log4j for any libraries using commons-logging
+    LogFactory.getFactory().setAttribute(
+        "org.apache.commons.logging.Log",
+        "org.apache.commons.logging.impl.Log4JLogger");
+    int rc = ToolRunner.run(new Configuration(), new Main(console), args);
+    System.exit(rc);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
new file mode 100644
index 0000000..860a218
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.FloatStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
+import java.util.Set;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.DELTA_BINARY_PACKED;
+import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import static org.apache.parquet.format.Encoding.DELTA_LENGTH_BYTE_ARRAY;
+
+
+public class Util {
+
+  private static final long KB = 1024;
+  private static final long MB = 1024 * KB;
+  private static final long GB = 1024 * MB;
+  private static final long TB = 1024 * GB;
+
+  public static String humanReadable(float bytes) {
+    if (bytes > TB) {
+      return String.format("%.03f TB", bytes / TB);
+    } else if (bytes > GB) {
+      return String.format("%.03f GB", bytes / GB);
+    } else if (bytes > MB) {
+      return String.format("%.03f MB", bytes / MB);
+    } else if (bytes > KB) {
+      return String.format("%.03f kB", bytes / KB);
+    } else {
+      return String.format("%.02f B", bytes);
+    }
+  }
+
+  public static String humanReadable(long bytes) {
+    if (bytes > TB) {
+      return String.format("%.03f TB", ((float) bytes) / TB);
+    } else if (bytes > GB) {
+      return String.format("%.03f GB", ((float) bytes) / GB);
+    } else if (bytes > MB) {
+      return String.format("%.03f MB", ((float) bytes) / MB);
+    } else if (bytes > KB) {
+      return String.format("%.03f kB", ((float) bytes) / KB);
+    } else {
+      return String.format("%d B", bytes);
+    }
+  }
+
+  public static String minMaxAsString(Statistics stats, OriginalType annotation) {
+    if (stats == null) {
+      return "no stats";
+    }
+    if (!stats.hasNonNullValue()) {
+      return "";
+    }
+    // TODO: use original types when showing decimal, timestamp, etc.
+    if (stats instanceof BooleanStatistics) {
+      return String.format("%s / %s",
+          ((BooleanStatistics) stats).getMin(),
+          ((BooleanStatistics) stats).getMax());
+    } else if (stats instanceof IntStatistics) {
+      return String.format("%d / %d",
+          ((IntStatistics) stats).getMin(),
+          ((IntStatistics) stats).getMax());
+    } else if (stats instanceof LongStatistics) {
+      return String.format("%d / %d",
+          ((LongStatistics) stats).getMin(),
+          ((LongStatistics) stats).getMax());
+    } else if (stats instanceof FloatStatistics) {
+      return String.format("%f / %f",
+          ((FloatStatistics) stats).getMin(),
+          ((FloatStatistics) stats).getMax());
+    } else if (stats instanceof DoubleStatistics) {
+      return String.format("%f / %f",
+          ((DoubleStatistics) stats).getMin(),
+          ((DoubleStatistics) stats).getMax());
+    } else if (stats instanceof BinaryStatistics) {
+      byte[] minBytes = stats.getMinBytes();
+      byte[] maxBytes = stats.getMaxBytes();
+      return String.format("%s / %s",
+          printable(minBytes, annotation == OriginalType.UTF8, 30),
+          printable(maxBytes, annotation == OriginalType.UTF8, 30));
+    } else {
+      throw new RuntimeException("Unknown stats type: " + stats);
+    }
+  }
+
+  public static String toString(Statistics stats, long count, OriginalType annotation) {
+    if (stats == null) {
+      return "no stats";
+    }
+    // TODO: use original types when showing decimal, timestamp, etc.
+    if (stats instanceof BooleanStatistics) {
+      return String.format("nulls: %d/%d", stats.getNumNulls(), count);
+    } else if (stats instanceof IntStatistics) {
+      return String.format("min: %d max: %d nulls: %d/%d",
+          ((IntStatistics) stats).getMin(), ((IntStatistics) stats).getMax(),
+          stats.getNumNulls(), count);
+    } else if (stats instanceof LongStatistics) {
+      return String.format("min: %d max: %d nulls: %d/%d",
+          ((LongStatistics) stats).getMin(), ((LongStatistics) stats).getMax(),
+          stats.getNumNulls(), count);
+    } else if (stats instanceof FloatStatistics) {
+      return String.format("min: %f max: %f nulls: %d/%d",
+          ((FloatStatistics) stats).getMin(),
+          ((FloatStatistics) stats).getMax(),
+          stats.getNumNulls(), count);
+    } else if (stats instanceof DoubleStatistics) {
+      return String.format("min: %f max: %f nulls: %d/%d",
+          ((DoubleStatistics) stats).getMin(),
+          ((DoubleStatistics) stats).getMax(),
+          stats.getNumNulls(), count);
+    } else if (stats instanceof BinaryStatistics) {
+      byte[] minBytes = stats.getMinBytes();
+      byte[] maxBytes = stats.getMaxBytes();
+      return String.format("min: %s max: %s nulls: %d/%d",
+          printable(minBytes, annotation == OriginalType.UTF8, 30),
+          printable(maxBytes, annotation == OriginalType.UTF8, 30),
+          stats.getNumNulls(), count);
+    } else {
+      throw new RuntimeException("Unknown stats type: " + stats);
+    }
+  }
+
+  private static String printable(byte[] bytes, boolean isUtf8, int len) {
+    if (bytes == null) {
+      return "null";
+    } else if (isUtf8) {
+      return humanReadable(new String(bytes, StandardCharsets.UTF_8), len);
+    } else {
+      return humanReadable(bytes, len);
+    }
+  }
+
+  public static String humanReadable(String str, int len) {
+    if (str == null) {
+      return "null";
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"");
+    if (str.length() > len - 2) {
+      sb.append(str.substring(0, len - 5)).append("...");
+    } else {
+      sb.append(str);
+    }
+    sb.append("\"");
+
+    return sb.toString();
+  }
+
+  public static String humanReadable(byte[] bytes, int len) {
+    if (bytes == null || bytes.length == 0) {
+      return "null";
+    }
+
+    StringBuilder sb = new StringBuilder();
+    String asString = Hex.encodeHexString(bytes);
+    sb.append("0x");
+    if (asString.length() > len - 2) {
+      sb.append(asString.substring(0, (len - 5) / 2)).append("...");
+    } else {
+      sb.append(asString);
+    }
+
+    return sb.toString();
+  }
+
+  public static String shortCodec(CompressionCodecName codec) {
+    switch (codec) {
+      case UNCOMPRESSED:
+        return "_";
+      case SNAPPY:
+        return "S";
+      case GZIP:
+        return "G";
+      case LZO:
+        return "L";
+      default:
+        return "?";
+    }
+  }
+
+  public static String encodingAsString(Encoding encoding, boolean isDict) {
+    switch (encoding) {
+      case PLAIN:
+        return "_";
+      case PLAIN_DICTIONARY:
+        // data pages use RLE, dictionary pages use plain
+        return isDict ? "_" : "R";
+      case RLE_DICTIONARY:
+        return "R";
+      case DELTA_BINARY_PACKED:
+      case DELTA_LENGTH_BYTE_ARRAY:
+      case DELTA_BYTE_ARRAY:
+        return "D";
+      default:
+        return "?";
+    }
+  }
+
+  public static String encodingStatsAsString(EncodingStats encodingStats) {
+    StringBuilder sb = new StringBuilder();
+    if (encodingStats.hasDictionaryPages()) {
+      for (Encoding encoding: encodingStats.getDictionaryEncodings()) {
+        sb.append(encodingAsString(encoding, true));
+      }
+      sb.append(" ");
+    } else {
+      sb.append("  ");
+    }
+
+    Set<Encoding> encodings = encodingStats.getDataEncodings();
+    if (encodings.contains(RLE_DICTIONARY) || encodings.contains(PLAIN_DICTIONARY)) {
+      sb.append("R");
+    }
+    if (encodings.contains(PLAIN)) {
+      sb.append("_");
+    }
+    if (encodings.contains(DELTA_BYTE_ARRAY) ||
+        encodings.contains(DELTA_BINARY_PACKED) ||
+        encodings.contains(DELTA_LENGTH_BYTE_ARRAY)) {
+      sb.append("D");
+    }
+
+    // Check for fallback and add a flag
+    if (encodingStats.hasDictionaryEncodedPages() && encodingStats.hasNonDictionaryEncodedPages()) {
+      sb.append(" F");
+    }
+
+    return sb.toString();
+  }
+
+  public static String encodingsAsString(Set<Encoding> encodings, ColumnDescriptor desc) {
+    StringBuilder sb = new StringBuilder();
+    if (encodings.contains(RLE) || encodings.contains(BIT_PACKED)) {
+      sb.append(desc.getMaxDefinitionLevel() == 0 ? "B" : "R");
+      sb.append(desc.getMaxRepetitionLevel() == 0 ? "B" : "R");
+      if (encodings.contains(PLAIN_DICTIONARY)) {
+        sb.append("R");
+      }
+      if (encodings.contains(PLAIN)) {
+        sb.append("_");
+      }
+    } else {
+      sb.append("RR");
+      if (encodings.contains(RLE_DICTIONARY)) {
+        sb.append("R");
+      }
+      if (encodings.contains(PLAIN)) {
+        sb.append("_");
+      }
+      if (encodings.contains(DELTA_BYTE_ARRAY) ||
+          encodings.contains(DELTA_BINARY_PACKED) ||
+          encodings.contains(DELTA_LENGTH_BYTE_ARRAY)) {
+        sb.append("D");
+      }
+    }
+    return sb.toString();
+  }
+
+  private static final Splitter DOT = Splitter.on('.');
+
+  public static ColumnDescriptor descriptor(String column, MessageType schema) {
+    String[] path = Iterables.toArray(DOT.split(column), String.class);
+    Preconditions.checkArgument(schema.containsPath(path),
+        "Schema doesn't have column: " + column);
+    return schema.getColumnDescription(path);
+  }
+
+  public static String columnName(ColumnDescriptor desc) {
+    return Joiner.on('.').join(desc.getPath());
+  }
+
+  public static PrimitiveType primitive(MessageType schema, String[] path) {
+    Type current = schema;
+    for (String part : path) {
+      current = current.asGroupType().getType(part);
+      if (current.isPrimitive()) {
+        return current.asPrimitiveType();
+      }
+    }
+    return null;
+  }
+
+  public static PrimitiveType primitive(String column, MessageType schema) {
+    String[] path = Iterables.toArray(DOT.split(column), String.class);
+    Preconditions.checkArgument(schema.containsPath(path),
+        "Schema doesn't have column: " + column);
+    return primitive(schema, path);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java
new file mode 100644
index 0000000..4fbfb9b
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.csv.CSVProperties;
+import org.apache.parquet.cli.csv.AvroCSV;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Set;
+
+@Parameters(commandDescription="Build a schema from a CSV data sample")
+public class CSVSchemaCommand extends BaseCommand {
+
+  public CSVSchemaCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description="<sample csv path>")
+  List<String> samplePaths;
+
+  @Parameter(names={"-o", "--output"}, description="Save schema avsc to path")
+  String outputPath = null;
+
+  @Parameter(names={"--class", "--record-name"}, required = true,
+      description="A name or class for the result schema")
+  String recordName = null;
+
+  @Parameter(names="--minimize",
+      description="Minimize schema file size by eliminating white space")
+  boolean minimize=false;
+
+  @Parameter(names="--delimiter", description="Delimiter character")
+  String delimiter = ",";
+
+  @Parameter(names="--escape", description="Escape character")
+  String escape = "\\";
+
+  @Parameter(names="--quote", description="Quote character")
+  String quote = "\"";
+
+  @Parameter(names="--no-header", description="Don't use first line as CSV header")
+  boolean noHeader = false;
+
+  @Parameter(names="--skip-lines", description="Lines to skip before CSV start")
+  int linesToSkip = 0;
+
+  @Parameter(names="--charset", description="Character set name", hidden = true)
+  String charsetName = Charset.defaultCharset().displayName();
+
+  @Parameter(names="--header",
+      description="Line to use as a header. Must match the CSV settings.")
+  String header;
+
+  @Parameter(names="--require",
+      description="Do not allow null values for the given field")
+  List<String> requiredFields;
+
+  @Override
+  public int run() throws IOException {
+    Preconditions.checkArgument(samplePaths != null && !samplePaths.isEmpty(),
+        "Sample CSV path is required");
+    Preconditions.checkArgument(samplePaths.size() == 1,
+        "Only one CSV sample can be given");
+
+    if (header != null) {
+      // if a header is given on the command line, do assume one is in the file
+      noHeader = true;
+    }
+
+    CSVProperties props = new CSVProperties.Builder()
+        .delimiter(delimiter)
+        .escape(escape)
+        .quote(quote)
+        .header(header)
+        .hasHeader(!noHeader)
+        .linesToSkip(linesToSkip)
+        .charset(charsetName)
+        .build();
+
+    Set<String> required = ImmutableSet.of();
+    if (requiredFields != null) {
+      required = ImmutableSet.copyOf(requiredFields);
+    }
+
+    // assume fields are nullable by default, users can easily change this
+    String sampleSchema = AvroCSV
+        .inferNullableSchema(
+            recordName, open(samplePaths.get(0)), props, required)
+        .toString(!minimize);
+
+    output(sampleSchema, console, outputPath);
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Print the schema for samples.csv to standard out:",
+        "samples.csv --record-name Sample",
+        "# Write schema to sample.avsc:",
+        "samples.csv -o sample.avsc --record-name Sample"
+    );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
new file mode 100644
index 0000000..7703e88
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.avro.Schema;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Expressions;
+import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.parquet.cli.util.Expressions.select;
+
+@Parameters(commandDescription = "Print the first N records from a file")
+public class CatCommand extends BaseCommand {
+
+  @Parameter(description = "<file>")
+  List<String> sourceFiles;
+
+  @Parameter(names={"-n", "--num-records"},
+      description="The number of records to print")
+  long numRecords;
+
+  @Parameter(
+      names = {"-c", "--column", "--columns"},
+      description = "List of columns")
+  List<String> columns;
+
+  public CatCommand(Logger console, long defaultNumRecords) {
+    super(console);
+    this.numRecords = defaultNumRecords;
+  }
+
+  @Override
+  public int run() throws IOException {
+    Preconditions.checkArgument(
+        sourceFiles != null && !sourceFiles.isEmpty(),
+        "Missing file name");
+    Preconditions.checkArgument(sourceFiles.size() == 1,
+        "Only one file can be given");
+
+    final String source = sourceFiles.get(0);
+
+    Schema schema = getAvroSchema(source);
+    Schema projection = Expressions.filterSchema(schema, columns);
+
+    Iterable<Object> reader = openDataFile(source, projection);
+    boolean threw = true;
+    long count = 0;
+    try {
+      for (Object record : reader) {
+        if (numRecords > 0 && count >= numRecords) {
+          break;
+        }
+        if (columns == null || columns.size() != 1) {
+          console.info(String.valueOf(record));
+        } else {
+          console.info(String.valueOf(select(projection, record, columns.get(0))));
+        }
+        count += 1;
+      }
+      threw = false;
+    } catch (RuntimeException e) {
+      throw new RuntimeException("Failed on record " + count, e);
+    } finally {
+      if (reader instanceof Closeable) {
+        Closeables.close((Closeable) reader, threw);
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Show the first 10 records in file \"data.avro\":",
+        "data.avro",
+        "# Show the first 50 records in file \"data.parquet\":",
+        "data.parquet -n 50"
+    );
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
new file mode 100644
index 0000000..8f60821
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.CorruptStatistics;
+import org.apache.parquet.Version;
+import org.apache.parquet.VersionParser;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.util.DynConstructors;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
+import org.slf4j.Logger;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+@Parameters(commandDescription = "Check Parquet files for corrupt page and column stats (PARQUET-251)")
+public class CheckParquet251Command extends BaseCommand {
+
+  public CheckParquet251Command(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<files>", required = true)
+  List<String> files;
+
+  @Override
+  public int run() throws IOException {
+    boolean badFiles = false;
+    for (String file : files) {
+      String problem = check(file);
+      if (problem != null) {
+        badFiles = true;
+        console.info("{} has corrupt stats: {}", file, problem);
+      } else {
+        console.info("{} has no corrupt stats", file);
+      }
+    }
+
+    return badFiles ? 1 : 0;
+  }
+
+  private String check(String file) throws IOException {
+    Path path = qualifiedPath(file);
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+        getConf(), path, ParquetMetadataConverter.NO_FILTER);
+
+    FileMetaData meta = footer.getFileMetaData();
+    String createdBy = meta.getCreatedBy();
+    if (CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) {
+      // create fake metadata that will read corrupt stats and return them
+      FileMetaData fakeMeta = new FileMetaData(
+          meta.getSchema(), meta.getKeyValueMetaData(), Version.FULL_VERSION);
+
+      // get just the binary columns
+      List<ColumnDescriptor> columns = Lists.newArrayList();
+      Iterables.addAll(columns, Iterables.filter(
+          meta.getSchema().getColumns(),
+          new Predicate<ColumnDescriptor>() {
+            @Override
+            public boolean apply(@Nullable ColumnDescriptor input) {
+              return input != null && input.getType() == BINARY;
+            }
+          }));
+
+      // now check to see if the data is actually corrupt
+      ParquetFileReader reader = new ParquetFileReader(getConf(),
+          fakeMeta, path, footer.getBlocks(), columns);
+
+      try {
+        PageStatsValidator validator = new PageStatsValidator();
+        for (PageReadStore pages = reader.readNextRowGroup(); pages != null;
+             pages = reader.readNextRowGroup()) {
+          validator.validate(columns, pages);
+        }
+      } catch (BadStatsException e) {
+        return e.getMessage();
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Arrays.asList(
+        "# Check file1.parquet for corrupt page and column stats",
+        "file1.parquet");
+  }
+
+
+  public static class BadStatsException extends RuntimeException {
+    public BadStatsException(String message) {
+      super(message);
+    }
+  }
+
+  public class SingletonPageReader implements PageReader {
+    private final DictionaryPage dict;
+    private final DataPage data;
+
+    public SingletonPageReader(DictionaryPage dict, DataPage data) {
+      this.dict = dict;
+      this.data = data;
+    }
+
+    @Override
+    public DictionaryPage readDictionaryPage() {
+      return dict;
+    }
+
+    @Override
+    public long getTotalValueCount() {
+      return data.getValueCount();
+    }
+
+    @Override
+    public DataPage readPage() {
+      return data;
+    }
+  }
+
+  private static <T extends Comparable<T>>
+  Statistics<T> getStatisticsFromPageHeader(DataPage page) {
+    return page.accept(new DataPage.Visitor<Statistics<T>>() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Statistics<T> visit(DataPageV1 dataPageV1) {
+        return (Statistics<T>) dataPageV1.getStatistics();
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public Statistics<T> visit(DataPageV2 dataPageV2) {
+        return (Statistics<T>) dataPageV2.getStatistics();
+      }
+    });
+  }
+
+  private class StatsValidator<T extends Comparable<T>> {
+    private final boolean hasNonNull;
+    private final T min;
+    private final T max;
+
+    public StatsValidator(DataPage page) {
+      Statistics<T> stats = getStatisticsFromPageHeader(page);
+      this.hasNonNull = stats.hasNonNullValue();
+      if (hasNonNull) {
+        this.min = stats.genericGetMin();
+        this.max = stats.genericGetMax();
+      } else {
+        this.min = null;
+        this.max = null;
+      }
+    }
+
+    public void validate(T value) {
+      if (hasNonNull) {
+        if (min.compareTo(value) > 0) {
+          throw new BadStatsException("Min should be <= all values.");
+        }
+        if (max.compareTo(value) < 0) {
+          throw new BadStatsException("Max should be >= all values.");
+        }
+      }
+    }
+  }
+
+  private PrimitiveConverter getValidatingConverter(
+      final DataPage page, PrimitiveTypeName type) {
+    return type.convert(new PrimitiveTypeNameConverter<PrimitiveConverter, RuntimeException>() {
+      @Override
+      public PrimitiveConverter convertFLOAT(PrimitiveTypeName primitiveTypeName) {
+        final StatsValidator<Float> validator = new StatsValidator<Float>(page);
+        return new PrimitiveConverter() {
+          @Override
+          public void addFloat(float value) {
+            validator.validate(value);
+          }
+        };
+      }
+
+      @Override
+      public PrimitiveConverter convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
+        final StatsValidator<Double> validator = new StatsValidator<Double>(page);
+        return new PrimitiveConverter() {
+          @Override
+          public void addDouble(double value) {
+            validator.validate(value);
+          }
+        };
+      }
+
+      @Override
+      public PrimitiveConverter convertINT32(PrimitiveTypeName primitiveTypeName) {
+        final StatsValidator<Integer> validator = new StatsValidator<Integer>(page);
+        return new PrimitiveConverter() {
+          @Override
+          public void addInt(int value) {
+            validator.validate(value);
+          }
+        };
+      }
+
+      @Override
+      public PrimitiveConverter convertINT64(PrimitiveTypeName primitiveTypeName) {
+        final StatsValidator<Long> validator = new StatsValidator<Long>(page);
+        return new PrimitiveConverter() {
+          @Override
+          public void addLong(long value) {
+            validator.validate(value);
+          }
+        };
+      }
+
+      @Override
+      public PrimitiveConverter convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
+        final StatsValidator<Boolean> validator = new StatsValidator<Boolean>(page);
+        return new PrimitiveConverter() {
+          @Override
+          public void addBoolean(boolean value) {
+            validator.validate(value);
+          }
+        };
+      }
+
+      @Override
+      public PrimitiveConverter convertINT96(PrimitiveTypeName primitiveTypeName) {
+        return convertBINARY(primitiveTypeName);
+      }
+
+      @Override
+      public PrimitiveConverter convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
+        return convertBINARY(primitiveTypeName);
+      }
+
+      @Override
+      public PrimitiveConverter convertBINARY(PrimitiveTypeName primitiveTypeName) {
+        final StatsValidator<Binary> validator = new StatsValidator<Binary>(page);
+        return new PrimitiveConverter() {
+          @Override
+          public void addBinary(Binary value) {
+            validator.validate(value);
+          }
+        };
+      }
+    });
+  }
+
+  private static final DynConstructors.Ctor<ColumnReader> COL_READER_CTOR =
+      new DynConstructors.Builder(ColumnReader.class)
+          .hiddenImpl("org.apache.parquet.column.impl.ColumnReaderImpl",
+              ColumnDescriptor.class, PageReader.class,
+              PrimitiveConverter.class, VersionParser.ParsedVersion.class)
+          .build();
+
+  public class PageStatsValidator {
+    public void validate(List<ColumnDescriptor> columns, PageReadStore store) {
+      for (ColumnDescriptor desc : columns) {
+        PageReader reader = store.getPageReader(desc);
+        DictionaryPage dict = reader.readDictionaryPage();
+        DictionaryPage reusableDict = null;
+        if (dict != null) {
+          try {
+            reusableDict = new DictionaryPage(
+                BytesInput.from(dict.getBytes().toByteArray()),
+                dict.getDictionarySize(), dict.getEncoding());
+          } catch (IOException e) {
+            throw new ParquetDecodingException("Cannot read dictionary", e);
+          }
+        }
+        DataPage page;
+        while ((page = reader.readPage()) != null) {
+          validateStatsForPage(page, reusableDict, desc);
+        }
+      }
+    }
+
+    private void validateStatsForPage(DataPage page, DictionaryPage dict,
+                                      ColumnDescriptor desc) {
+      SingletonPageReader reader = new SingletonPageReader(dict, page);
+      PrimitiveConverter converter = getValidatingConverter(page, desc.getType());
+      Statistics stats = getStatisticsFromPageHeader(page);
+
+      long numNulls = 0;
+
+      ColumnReader column = COL_READER_CTOR.newInstance(desc, reader, converter, null);
+      for (int i = 0; i < reader.getTotalValueCount(); i += 1) {
+        if (column.getCurrentDefinitionLevel() >= desc.getMaxDefinitionLevel()) {
+          column.writeCurrentValueToConverter();
+        } else {
+          numNulls += 1;
+        }
+        column.consume();
+      }
+
+      if (numNulls != stats.getNumNulls()) {
+        throw new BadStatsException("Number of nulls doesn't match.");
+      }
+
+      console.debug(String.format(
+          "Validated stats min=%s max=%s nulls=%d for page=%s col=%s",
+          String.valueOf(stats.genericGetMin()),
+          String.valueOf(stats.genericGetMax()), stats.getNumNulls(), page,
+          Arrays.toString(desc.getPath())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
new file mode 100644
index 0000000..624ba91
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.csv.AvroCSVReader;
+import org.apache.parquet.cli.csv.CSVProperties;
+import org.apache.parquet.cli.csv.AvroCSV;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.cli.util.Codecs;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+@Parameters(commandDescription="Create a file from CSV data")
+public class ConvertCSVCommand extends BaseCommand {
+
+  public ConvertCSVCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description="<csv path>")
+  List<String> targets;
+
+  @Parameter(
+      names={"-o", "--output"},
+      description="Output file path",
+      required=true)
+  String outputPath = null;
+
+  @Parameter(
+      names={"-2", "--format-version-2", "--writer-version-2"},
+      description="Use Parquet format version 2",
+      hidden = true)
+  boolean v2 = false;
+
+  @Parameter(names="--delimiter", description="Delimiter character")
+  String delimiter = ",";
+
+  @Parameter(names="--escape", description="Escape character")
+  String escape = "\\";
+
+  @Parameter(names="--quote", description="Quote character")
+  String quote = "\"";
+
+  @Parameter(names="--no-header", description="Don't use first line as CSV header")
+  boolean noHeader = false;
+
+  @Parameter(names="--skip-lines", description="Lines to skip before CSV start")
+  int linesToSkip = 0;
+
+  @Parameter(names="--charset", description="Character set name", hidden = true)
+  String charsetName = Charset.defaultCharset().displayName();
+
+  @Parameter(names="--header",
+      description="Line to use as a header. Must match the CSV settings.")
+  String header;
+
+  @Parameter(names="--require",
+      description="Do not allow null values for the given field")
+  List<String> requiredFields;
+
+  @Parameter(names = {"-s", "--schema"},
+      description = "The file containing the Avro schema.")
+  String avroSchemaFile;
+
+  @Parameter(names = {"--compression-codec"},
+      description = "A compression codec name.")
+  String compressionCodecName = "GZIP";
+
+  @Parameter(names="--row-group-size", description="Target row group size")
+  int rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
+
+  @Parameter(names="--page-size", description="Target page size")
+  int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+  @Parameter(names="--dictionary-size", description="Max dictionary page size")
+  int dictionaryPageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+  @Parameter(
+      names={"--overwrite"},
+      description="Remove any data already in the target view or dataset")
+  boolean overwrite = false;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() == 1,
+        "CSV path is required.");
+
+    if (header != null) {
+      // if a header is given on the command line, don't assume one is in the file
+      noHeader = true;
+    }
+
+    CSVProperties props = new CSVProperties.Builder()
+        .delimiter(delimiter)
+        .escape(escape)
+        .quote(quote)
+        .header(header)
+        .hasHeader(!noHeader)
+        .linesToSkip(linesToSkip)
+        .charset(charsetName)
+        .build();
+
+    String source = targets.get(0);
+
+    Schema csvSchema;
+    if (avroSchemaFile != null) {
+      csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
+    } else {
+      Set<String> required = ImmutableSet.of();
+      if (requiredFields != null) {
+        required = ImmutableSet.copyOf(requiredFields);
+      }
+
+      String filename = new File(source).getName();
+      String recordName;
+      if (filename.contains(".")) {
+        recordName = filename.substring(0, filename.indexOf("."));
+      } else {
+        recordName = filename;
+      }
+
+      csvSchema = AvroCSV.inferNullableSchema(
+          recordName, open(source), props, required);
+    }
+
+    long count = 0;
+    try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
+        open(source), props, csvSchema, Record.class, true)) {
+        CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
+      try (ParquetWriter<Record> writer = AvroParquetWriter
+          .<Record>builder(qualifiedPath(outputPath))
+          .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
+          .withWriteMode(overwrite ?
+              ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
+          .withCompressionCodec(codec)
+          .withDictionaryEncoding(true)
+          .withDictionaryPageSize(dictionaryPageSize)
+          .withPageSize(pageSize)
+          .withRowGroupSize(rowGroupSize)
+          .withDataModel(GenericData.get())
+          .withConf(getConf())
+          .withSchema(csvSchema)
+          .build()) {
+        for (Record record : reader) {
+          writer.write(record);
+        }
+      } catch (RuntimeException e) {
+        throw new RuntimeException("Failed on record " + count, e);
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Create a Parquet file from a CSV file",
+        "sample.csv sample.parquet --schema schema.avsc",
+        "# Create a Parquet file in HDFS from local CSV",
+        "path/to/sample.csv hdfs:/user/me/sample.parquet --schema schema.avsc",
+        "# Create an Avro file from CSV data in S3",
+        "s3:/data/path/sample.csv sample.avro --format avro --schema s3:/schemas/schema.avsc"
+    );
+  }
+}


Mime
View raw message