parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [45/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:42 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-benchmarks/src/main/java/parquet/benchmarks/WriteBenchmarks.java
----------------------------------------------------------------------
diff --git a/parquet-benchmarks/src/main/java/parquet/benchmarks/WriteBenchmarks.java b/parquet-benchmarks/src/main/java/parquet/benchmarks/WriteBenchmarks.java
deleted file mode 100644
index ca6e43b..0000000
--- a/parquet-benchmarks/src/main/java/parquet/benchmarks/WriteBenchmarks.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.benchmarks;
-
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-
-import static org.openjdk.jmh.annotations.Scope.Thread;
-import static parquet.benchmarks.BenchmarkConstants.*;
-import static parquet.benchmarks.BenchmarkFiles.*;
-
-import java.io.IOException;
-
-import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
-import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
-import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
-import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
-
-@State(Thread)
-public class WriteBenchmarks {
-  private DataGenerator dataGenerator = new DataGenerator();
-
-  @Setup(Level.Iteration)
-  public void cleanup() {
-    //clean existing test data at the beginning of each iteration
-    dataGenerator.cleanup();
-  }
-
-  @Benchmark
-  public void write1MRowsDefaultBlockAndPageSizeUncompressed()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_DEFAULT,
-                               PAGE_SIZE_DEFAULT,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               UNCOMPRESSED,
-                               ONE_MILLION);
-  }
-
-  @Benchmark
-  public void write1MRowsBS256MPS4MUncompressed()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M_BS256M_PS4M,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_256M,
-                               PAGE_SIZE_4M,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               UNCOMPRESSED,
-                               ONE_MILLION);
-  }
-
-  @Benchmark
-  public void write1MRowsBS256MPS8MUncompressed()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M_BS256M_PS8M,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_256M,
-                               PAGE_SIZE_8M,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               UNCOMPRESSED,
-                               ONE_MILLION);
-  }
-
-  @Benchmark
-  public void write1MRowsBS512MPS4MUncompressed()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M_BS512M_PS4M,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_512M,
-                               PAGE_SIZE_4M,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               UNCOMPRESSED,
-                               ONE_MILLION);
-  }
-
-  @Benchmark
-  public void write1MRowsBS512MPS8MUncompressed()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M_BS512M_PS8M,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_512M,
-                               PAGE_SIZE_8M,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               UNCOMPRESSED,
-                               ONE_MILLION);
-  }
-
-  //TODO how to handle lzo jar?
-//  @Benchmark
-//  public void write1MRowsDefaultBlockAndPageSizeLZO()
-//          throws IOException
-//  {
-//    dataGenerator.generateData(parquetFile_1M_LZO,
-//            configuration,
-//            WriterVersion.PARQUET_2_0,
-//            BLOCK_SIZE_DEFAULT,
-//            PAGE_SIZE_DEFAULT,
-//            FIXED_LEN_BYTEARRAY_SIZE,
-//            LZO,
-//            ONE_MILLION);
-//  }
-
-  @Benchmark
-  public void write1MRowsDefaultBlockAndPageSizeSNAPPY()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M_SNAPPY,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_DEFAULT,
-                               PAGE_SIZE_DEFAULT,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               SNAPPY,
-                               ONE_MILLION);
-  }
-
-  @Benchmark
-  public void write1MRowsDefaultBlockAndPageSizeGZIP()
-          throws IOException
-  {
-    dataGenerator.generateData(file_1M_GZIP,
-                               configuration,
-                               PARQUET_2_0,
-                               BLOCK_SIZE_DEFAULT,
-                               PAGE_SIZE_DEFAULT,
-                               FIXED_LEN_BYTEARRAY_SIZE,
-                               GZIP,
-                               ONE_MILLION);
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml
index a28faf2..99fbc2a 100644
--- a/parquet-cascading/pom.xml
+++ b/parquet-cascading/pom.xml
@@ -18,7 +18,7 @@
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
-    <groupId>com.twitter</groupId>
+    <groupId>org.apache.parquet</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
     <version>1.7.0-incubating-SNAPSHOT</version>
@@ -41,17 +41,17 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.twitter</groupId>
+      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-column</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
+      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-hadoop</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-        <groupId>com.twitter</groupId>
+        <groupId>org.apache.parquet</groupId>
         <artifactId>parquet-thrift</artifactId>
         <version>${project.version}</version>
     </dependency>
@@ -68,7 +68,7 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
+      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-column</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
new file mode 100644
index 0000000..ea70d43
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
@@ -0,0 +1,80 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.thrift.TBase;
+
+import cascading.flow.FlowProcess;
+import cascading.tap.Tap;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
+import org.apache.parquet.thrift.TBaseRecordConverter;
+
+public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
+
+  // In the case of reads, we can read the thrift class from the file metadata
+  public ParquetTBaseScheme() {
+    this(new Config<T>());
+  }
+
+  public ParquetTBaseScheme(Class<T> thriftClass) {
+    this(new Config<T>().withRecordClass(thriftClass));
+  }
+
+  public ParquetTBaseScheme(FilterPredicate filterPredicate) {
+    this(new Config<T>().withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
+    this(new Config<T>().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetTBaseScheme(Config<T> config) {
+    super(config);
+  }
+
+  @Override
+  public void sourceConfInit(FlowProcess<JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+    super.sourceConfInit(fp, tap, jobConf);
+    jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
+    ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
+    ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
+  }
+
+  @Override
+  public void sinkConfInit(FlowProcess<JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+
+    if (this.config.getKlass() == null) {
+      throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
+    }
+
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
+    DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
+    TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
new file mode 100644
index 0000000..41b56d0
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
@@ -0,0 +1,191 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.parquet.cascading;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.CompositeTap;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+  * A Cascading Scheme that converts Parquet groups into Cascading tuples.
+  * If you provide it with sourceFields, it will selectively materialize only the columns for those fields.
+  * The names must match the names in the Parquet schema.
+  * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the
+  * Parquet schema.
+  * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be
+  * flattened to a top-level field in the Cascading tuple.
+  *
+  * @author Avi Bryant
+  */
+
+public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
+
+  private static final long serialVersionUID = 0L;
+  private String parquetSchema;
+  private final FilterPredicate filterPredicate;
+
+  public ParquetTupleScheme() {
+    super();
+    this.filterPredicate = null;
+  }
+
+  public ParquetTupleScheme(Fields sourceFields) {
+    super(sourceFields);
+    this.filterPredicate = null;
+  }
+
+  public ParquetTupleScheme(FilterPredicate filterPredicate) {
+    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+  }
+
+  public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) {
+    super(sourceFields);
+    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+  }
+
+  /**
+   * ParquetTupleScheme constructor used a sink need to be implemented
+   *
+   * @param sourceFields used for the reading step
+   * @param sinkFields used for the writing step
+   * @param schema is mandatory if you add sinkFields and needs to be the
+   * toString() from a MessageType. This value is going to be parsed when the
+   * parquet file will be created.
+   */
+  public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) {
+    super(sourceFields, sinkFields);
+    parquetSchema = schema;
+    this.filterPredicate = null;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void sourceConfInit(FlowProcess<JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+
+    if (filterPredicate != null) {
+      ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate);
+    }
+
+    jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
+    ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class);
+    TupleReadSupport.setRequestedFields(jobConf, getSourceFields());
+ }
+
+ @Override
+ public Fields retrieveSourceFields(FlowProcess<JobConf> flowProcess, Tap tap) {
+    MessageType schema = readSchema(flowProcess, tap);
+    SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields());
+
+    setSourceFields(intersection.getSourceFields());
+
+    return getSourceFields();
+  }
+
+  private MessageType readSchema(FlowProcess<JobConf> flowProcess, Tap tap) {
+    try {
+      Hfs hfs;
+
+      if( tap instanceof CompositeTap )
+        hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next();
+      else
+        hfs = (Hfs) tap;
+
+      List<Footer> footers = getFooters(flowProcess, hfs);
+
+      if(footers.isEmpty()) {
+        throw new TapException("Could not read Parquet metadata at " + hfs.getPath());
+      } else {
+        return footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+      }
+    } catch (IOException e) {
+      throw new TapException(e);
+    }
+  }
+
+   private List<Footer> getFooters(FlowProcess<JobConf> flowProcess, Hfs hfs) throws IOException {
+     JobConf jobConf = flowProcess.getConfigCopy();
+     DeprecatedParquetInputFormat format = new DeprecatedParquetInputFormat();
+     format.addInputPath(jobConf, hfs.getPath());
+     return format.getFooters(jobConf);
+   }
+
+   @SuppressWarnings("unchecked")
+  @Override
+  public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
+      throws IOException {
+    Container<Tuple> value = (Container<Tuple>) sc.getInput().createValue();
+    boolean hasNext = sc.getInput().next(null, value);
+    if (!hasNext) { return false; }
+
+    // Skip nulls
+    if (value == null) { return true; }
+
+    sc.getIncomingEntry().setTuple(value.get());
+    return true;
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void sinkConfInit(FlowProcess<JobConf> fp,
+          Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
+    jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
+    ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
+  }
+
+  @Override
+  public boolean isSink() {
+    return parquetSchema != null;
+  }
+
+  @Override
+  public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sink)
+          throws IOException {
+    TupleEntry tuple = sink.getOutgoingEntry();
+    OutputCollector outputCollector = sink.getOutput();
+    outputCollector.collect(null, tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
new file mode 100644
index 0000000..33d1bb7
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
@@ -0,0 +1,162 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.thrift.ParquetThriftInputFormat;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A Cascading Scheme that returns a simple Tuple with a single value, the "value" object
+ * coming out of the underlying InputFormat.
+ *
+ * This is an abstract class; implementations are expected to set up their Input/Output Formats
+ * correctly in the respective Init methods.
+ */
+public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
+
+  public static final class Config<T> implements Serializable {
+    private final FilterPredicate filterPredicate;
+    private final String projectionString;
+    private final Class<T> klass;
+    private Config(Class<T> klass, FilterPredicate filterPredicate, String projectionString) {
+      this.filterPredicate = filterPredicate;
+      this.projectionString = projectionString;
+      this.klass = klass;
+    }
+
+    public Config() {
+      filterPredicate = null;
+      projectionString = null;
+      klass = null;
+    }
+
+    public FilterPredicate getFilterPredicate() {
+      return filterPredicate;
+    }
+
+    public String getProjectionString() {
+      return projectionString;
+    }
+
+    public Class<T> getKlass() {
+      return klass;
+    }
+
+    public Config<T> withFilterPredicate(FilterPredicate f) {
+      return new Config<T>(this.klass, checkNotNull(f, "filterPredicate"), this.projectionString);
+    }
+
+    public Config<T> withProjectionString(String p) {
+      return new Config<T>(this.klass, this.filterPredicate, checkNotNull(p, "projectionFilter"));
+    }
+
+    public Config<T> withRecordClass(Class<T> klass) {
+      return new Config<T>(checkNotNull(klass, "recordClass"), this.filterPredicate, this.projectionString);
+    }
+  }
+
+  private static final long serialVersionUID = 157560846420730043L;
+  protected final Config<T> config;
+
+  public ParquetValueScheme() {
+    this(new Config<T>());
+  }
+
+  public ParquetValueScheme(FilterPredicate filterPredicate) {
+    this(new Config<T>().withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetValueScheme(Config<T> config) {
+    this.config = config;
+  }
+
+  private void setProjectionPushdown(JobConf jobConf) {
+    if (this.config.projectionString!= null) {
+      ThriftReadSupport.setProjectionPushdown(jobConf, this.config.projectionString);
+    }
+  }
+
+  private void setPredicatePushdown(JobConf jobConf) {
+    if (this.config.filterPredicate != null) {
+      ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate);
+    }
+  }
+  @Override
+  public void sourceConfInit(FlowProcess<JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) {
+    setPredicatePushdown(jobConf);
+    setProjectionPushdown(jobConf);
+    setRecordClass(jobConf);
+  }
+
+  private void setRecordClass(JobConf jobConf) {
+    if (config.klass != null) {
+      ParquetThriftInputFormat.setThriftClass(jobConf, config.klass);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
+      throws IOException {
+    Container<T> value = (Container<T>) sc.getInput().createValue();
+    boolean hasNext = sc.getInput().next(null, value);
+    if (!hasNext) { return false; }
+
+    // Skip nulls
+    if (value == null) { return true; }
+
+    sc.getIncomingEntry().setTuple(new Tuple(value.get()));
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sc)
+      throws IOException {
+    TupleEntry tuple = sc.getOutgoingEntry();
+
+    if (tuple.size() != 1) {
+      throw new RuntimeException("ParquetValueScheme expects tuples with an arity of exactly 1, but found " + tuple.getFields());
+    }
+
+    T value = (T) tuple.getObject(0);
+    OutputCollector output = sc.getOutput();
+    output.collect(null, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
new file mode 100644
index 0000000..e3fc3f7
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
@@ -0,0 +1,63 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import cascading.tuple.Fields;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class SchemaIntersection {
+
+  private final MessageType requestedSchema;
+  private final Fields sourceFields;
+
+  public SchemaIntersection(MessageType fileSchema, Fields requestedFields) {
+    if(requestedFields == Fields.UNKNOWN)
+      requestedFields = Fields.ALL;
+
+    Fields newFields = Fields.NONE;
+    List<Type> newSchemaFields = new ArrayList<Type>();
+    int schemaSize = fileSchema.getFieldCount();
+
+    for (int i = 0; i < schemaSize; i++) {
+      Type type = fileSchema.getType(i);
+      Fields name = new Fields(type.getName());
+
+      if(requestedFields.contains(name)) {
+        newFields = newFields.append(name);
+        newSchemaFields.add(type);
+      }
+    }
+
+    this.sourceFields = newFields;
+    this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields);
+  }
+
+  public MessageType getRequestedSchema() {
+    return requestedSchema;
+  }
+
+  public Fields getSourceFields() {
+    return sourceFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
new file mode 100644
index 0000000..42a5926
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
@@ -0,0 +1,80 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.commons.lang.StringUtils;
+
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+import cascading.flow.hadoop.util.HadoopUtil;
+
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.cascading.convert.TupleRecordMaterializer;
+
+
+public class TupleReadSupport extends ReadSupport<Tuple> {
+  static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields";
+
+  static protected Fields getRequestedFields(Configuration configuration) {
+    String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS);
+
+    if(fieldsString == null)
+      return Fields.ALL;
+
+    String[] parts = StringUtils.split(fieldsString, ":");
+    if(parts.length == 0)
+      return Fields.ALL;
+    else
+      return new Fields(parts);
+  }
+
+  static protected void setRequestedFields(JobConf configuration, Fields fields) {
+    String fieldsString = StringUtils.join(fields.iterator(), ":");
+    configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString);
+  }
+
+  @Override
+  public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
+    Fields requestedFields = getRequestedFields(configuration);
+    if (requestedFields == null) {
+      return new ReadContext(fileSchema);
+    } else {
+      SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields);
+      return new ReadContext(intersection.getRequestedSchema());
+    }
+  }
+
+  @Override
+  public RecordMaterializer<Tuple> prepareForRead(
+      Configuration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadContext readContext) {
+    MessageType requestedSchema = readContext.getRequestedSchema();
+    return new TupleRecordMaterializer(requestedSchema);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
new file mode 100644
index 0000000..2489b2e
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
@@ -0,0 +1,106 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.tuple.TupleEntry;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+/**
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ */
+public class TupleWriteSupport extends WriteSupport<TupleEntry> {
+
+  private RecordConsumer recordConsumer;
+  private MessageType rootSchema;
+  public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
+
+  @Override
+  public WriteContext init(Configuration configuration) {
+    String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
+    rootSchema = MessageTypeParser.parseMessageType(schema);
+    return new WriteContext(rootSchema, new HashMap<String, String>());
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    this.recordConsumer = recordConsumer;
+  }
+
+  @Override
+  public void write(TupleEntry record) {
+    recordConsumer.startMessage();
+    final List<Type> fields = rootSchema.getFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+      Type field = fields.get(i);
+
+      if (record == null || record.getObject(field.getName()) == null) {
+        continue;
+      }
+      recordConsumer.startField(field.getName(), i);
+      if (field.isPrimitive()) {
+        writePrimitive(record, field.asPrimitiveType());
+      } else {
+        throw new UnsupportedOperationException("Complex type not implemented");
+      }
+      recordConsumer.endField(field.getName(), i);
+    }
+    recordConsumer.endMessage();
+  }
+
+  private void writePrimitive(TupleEntry record, PrimitiveType field) {
+    switch (field.getPrimitiveTypeName()) {
+      case BINARY:
+        recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
+        break;
+      case BOOLEAN:
+        recordConsumer.addBoolean(record.getBoolean(field.getName()));
+        break;
+      case INT32:
+        recordConsumer.addInteger(record.getInteger(field.getName()));
+        break;
+      case INT64:
+        recordConsumer.addLong(record.getLong(field.getName()));
+        break;
+      case DOUBLE:
+        recordConsumer.addDouble(record.getDouble(field.getName()));
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(record.getFloat(field.getName()));
+        break;
+      case FIXED_LEN_BYTE_ARRAY:
+        throw new UnsupportedOperationException("Fixed len byte array type not implemented");
+      case INT96:
+        throw new UnsupportedOperationException("Int96 type not implemented");
+      default:
+        throw new UnsupportedOperationException(field.getName() + " type not implemented");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
new file mode 100644
index 0000000..3741165
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
@@ -0,0 +1,115 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading.convert;
+
+import cascading.tuple.Tuple;
+
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.pig.TupleConversionException;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+
+public class TupleConverter extends GroupConverter {
+
+  protected Tuple currentTuple;
+  private final Converter[] converters;
+
+  public TupleConverter(GroupType parquetSchema) {
+    int schemaSize = parquetSchema.getFieldCount();
+
+    this.converters = new Converter[schemaSize];
+    for (int i = 0; i < schemaSize; i++) {
+      Type type = parquetSchema.getType(i);
+      converters[i] = newConverter(type, i);
+    }
+  }
+
+  private Converter newConverter(Type type, int i) {
+    if(!type.isPrimitive()) {
+      throw new IllegalArgumentException("cascading can only build tuples from primitive types");
+    } else {
+      return new TuplePrimitiveConverter(this, i);
+    }
+  }
+
+  @Override
+  public Converter getConverter(int fieldIndex) {
+    return converters[fieldIndex];
+  }
+
+  @Override
+  final public void start() {
+    currentTuple = Tuple.size(converters.length);
+  }
+
+  @Override
+  public void end() {
+  }
+
+  final public Tuple getCurrentTuple() {
+    return currentTuple;
+  }
+
+  static final class TuplePrimitiveConverter extends PrimitiveConverter {
+    private final TupleConverter parent;
+    private final int index;
+
+    public TuplePrimitiveConverter(TupleConverter parent, int index) {
+      this.parent = parent;
+      this.index = index;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      parent.getCurrentTuple().setBoolean(index, value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.getCurrentTuple().setDouble(index, value);
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.getCurrentTuple().setFloat(index, value);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.getCurrentTuple().setInteger(index, value);
+    }
+
+    @Override
+    public void addLong(long value) {
+      parent.getCurrentTuple().setLong(index, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
new file mode 100644
index 0000000..275e17b
--- /dev/null
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
@@ -0,0 +1,46 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading.convert;
+
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+
+public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
+
+  private TupleConverter root;
+
+  public TupleRecordMaterializer(GroupType parquetSchema) {
+    this.root = new TupleConverter(parquetSchema);
+  }
+
+  @Override
+  public Tuple getCurrentRecord() {
+    return root.getCurrentTuple();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
deleted file mode 100644
index ab84749..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.thrift.TBase;
-
-import cascading.flow.FlowProcess;
-import cascading.tap.Tap;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.hadoop.ParquetInputFormat;
-import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
-import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
-import parquet.hadoop.thrift.ThriftReadSupport;
-import parquet.hadoop.thrift.TBaseWriteSupport;
-import parquet.thrift.TBaseRecordConverter;
-
-public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
-
-  // In the case of reads, we can read the thrift class from the file metadata
-  public ParquetTBaseScheme() {
-    this(new Config<T>());
-  }
-
-  public ParquetTBaseScheme(Class<T> thriftClass) {
-    this(new Config<T>().withRecordClass(thriftClass));
-  }
-
-  public ParquetTBaseScheme(FilterPredicate filterPredicate) {
-    this(new Config<T>().withFilterPredicate(filterPredicate));
-  }
-
-  public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
-    this(new Config<T>().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
-  }
-
-  public ParquetTBaseScheme(Config<T> config) {
-    super(config);
-  }
-
-  @Override
-  public void sourceConfInit(FlowProcess<JobConf> fp,
-      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-    super.sourceConfInit(fp, tap, jobConf);
-    jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
-    ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
-    ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
-  }
-
-  @Override
-  public void sinkConfInit(FlowProcess<JobConf> fp,
-      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-
-    if (this.config.getKlass() == null) {
-      throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
-    }
-
-    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
-    DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
-    TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
deleted file mode 100644
index 7f6ac3a..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package parquet.cascading;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-
-import cascading.flow.FlowProcess;
-import cascading.scheme.Scheme;
-import cascading.scheme.SinkCall;
-import cascading.scheme.SourceCall;
-import cascading.tap.CompositeTap;
-import cascading.tap.Tap;
-import cascading.tap.TapException;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetInputFormat;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.mapred.Container;
-import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
-import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
-import parquet.schema.MessageType;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
-  * A Cascading Scheme that converts Parquet groups into Cascading tuples.
-  * If you provide it with sourceFields, it will selectively materialize only the columns for those fields.
-  * The names must match the names in the Parquet schema.
-  * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the
-  * Parquet schema.
-  * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be
-  * flattened to a top-level field in the Cascading tuple.
-  *
-  * @author Avi Bryant
-  */
-
-public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
-
-  private static final long serialVersionUID = 0L;
-  private String parquetSchema;
-  private final FilterPredicate filterPredicate;
-
-  public ParquetTupleScheme() {
-    super();
-    this.filterPredicate = null;
-  }
-
-  public ParquetTupleScheme(Fields sourceFields) {
-    super(sourceFields);
-    this.filterPredicate = null;
-  }
-
-  public ParquetTupleScheme(FilterPredicate filterPredicate) {
-    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
-  }
-
-  public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) {
-    super(sourceFields);
-    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
-  }
-
-  /**
-   * ParquetTupleScheme constructor used a sink need to be implemented
-   *
-   * @param sourceFields used for the reading step
-   * @param sinkFields used for the writing step
-   * @param schema is mandatory if you add sinkFields and needs to be the
-   * toString() from a MessageType. This value is going to be parsed when the
-   * parquet file will be created.
-   */
-  public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) {
-    super(sourceFields, sinkFields);
-    parquetSchema = schema;
-    this.filterPredicate = null;
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void sourceConfInit(FlowProcess<JobConf> fp,
-      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-
-    if (filterPredicate != null) {
-      ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate);
-    }
-
-    jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
-    ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class);
-    TupleReadSupport.setRequestedFields(jobConf, getSourceFields());
- }
-
- @Override
- public Fields retrieveSourceFields(FlowProcess<JobConf> flowProcess, Tap tap) {
-    MessageType schema = readSchema(flowProcess, tap);
-    SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields());
-
-    setSourceFields(intersection.getSourceFields());
-
-    return getSourceFields();
-  }
-
-  private MessageType readSchema(FlowProcess<JobConf> flowProcess, Tap tap) {
-    try {
-      Hfs hfs;
-
-      if( tap instanceof CompositeTap )
-        hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next();
-      else
-        hfs = (Hfs) tap;
-
-      List<Footer> footers = getFooters(flowProcess, hfs);
-
-      if(footers.isEmpty()) {
-        throw new TapException("Could not read Parquet metadata at " + hfs.getPath());
-      } else {
-        return footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
-      }
-    } catch (IOException e) {
-      throw new TapException(e);
-    }
-  }
-
-   private List<Footer> getFooters(FlowProcess<JobConf> flowProcess, Hfs hfs) throws IOException {
-     JobConf jobConf = flowProcess.getConfigCopy();
-     DeprecatedParquetInputFormat format = new DeprecatedParquetInputFormat();
-     format.addInputPath(jobConf, hfs.getPath());
-     return format.getFooters(jobConf);
-   }
-
-   @SuppressWarnings("unchecked")
-  @Override
-  public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
-      throws IOException {
-    Container<Tuple> value = (Container<Tuple>) sc.getInput().createValue();
-    boolean hasNext = sc.getInput().next(null, value);
-    if (!hasNext) { return false; }
-
-    // Skip nulls
-    if (value == null) { return true; }
-
-    sc.getIncomingEntry().setTuple(value.get());
-    return true;
-  }
-
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void sinkConfInit(FlowProcess<JobConf> fp,
-          Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
-    jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
-    ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
-  }
-
-  @Override
-  public boolean isSink() {
-    return parquetSchema != null;
-  }
-
-  @Override
-  public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sink)
-          throws IOException {
-    TupleEntry tuple = sink.getOutgoingEntry();
-    OutputCollector outputCollector = sink.getOutput();
-    outputCollector.collect(null, tuple);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
deleted file mode 100644
index 0f75479..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-
-import cascading.flow.FlowProcess;
-import cascading.scheme.Scheme;
-import cascading.scheme.SinkCall;
-import cascading.scheme.SourceCall;
-import cascading.tap.Tap;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.hadoop.ParquetInputFormat;
-import parquet.hadoop.mapred.Container;
-import parquet.hadoop.thrift.ParquetThriftInputFormat;
-import parquet.hadoop.thrift.ThriftReadSupport;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * A Cascading Scheme that returns a simple Tuple with a single value, the "value" object
- * coming out of the underlying InputFormat.
- *
- * This is an abstract class; implementations are expected to set up their Input/Output Formats
- * correctly in the respective Init methods.
- */
-public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
-
-  public static final class Config<T> implements Serializable {
-    private final FilterPredicate filterPredicate;
-    private final String projectionString;
-    private final Class<T> klass;
-    private Config(Class<T> klass, FilterPredicate filterPredicate, String projectionString) {
-      this.filterPredicate = filterPredicate;
-      this.projectionString = projectionString;
-      this.klass = klass;
-    }
-
-    public Config() {
-      filterPredicate = null;
-      projectionString = null;
-      klass = null;
-    }
-
-    public FilterPredicate getFilterPredicate() {
-      return filterPredicate;
-    }
-
-    public String getProjectionString() {
-      return projectionString;
-    }
-
-    public Class<T> getKlass() {
-      return klass;
-    }
-
-    public Config<T> withFilterPredicate(FilterPredicate f) {
-      return new Config<T>(this.klass, checkNotNull(f, "filterPredicate"), this.projectionString);
-    }
-
-    public Config<T> withProjectionString(String p) {
-      return new Config<T>(this.klass, this.filterPredicate, checkNotNull(p, "projectionFilter"));
-    }
-
-    public Config<T> withRecordClass(Class<T> klass) {
-      return new Config<T>(checkNotNull(klass, "recordClass"), this.filterPredicate, this.projectionString);
-    }
-  }
-
-  private static final long serialVersionUID = 157560846420730043L;
-  protected final Config<T> config;
-
-  public ParquetValueScheme() {
-    this(new Config<T>());
-  }
-
-  public ParquetValueScheme(FilterPredicate filterPredicate) {
-    this(new Config<T>().withFilterPredicate(filterPredicate));
-  }
-
-  public ParquetValueScheme(Config<T> config) {
-    this.config = config;
-  }
-
-  private void setProjectionPushdown(JobConf jobConf) {
-    if (this.config.projectionString!= null) {
-      ThriftReadSupport.setProjectionPushdown(jobConf, this.config.projectionString);
-    }
-  }
-
-  private void setPredicatePushdown(JobConf jobConf) {
-    if (this.config.filterPredicate != null) {
-      ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate);
-    }
-  }
-  @Override
-  public void sourceConfInit(FlowProcess<JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) {
-    setPredicatePushdown(jobConf);
-    setProjectionPushdown(jobConf);
-    setRecordClass(jobConf);
-  }
-
-  private void setRecordClass(JobConf jobConf) {
-    if (config.klass != null) {
-      ParquetThriftInputFormat.setThriftClass(jobConf, config.klass);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader> sc)
-      throws IOException {
-    Container<T> value = (Container<T>) sc.getInput().createValue();
-    boolean hasNext = sc.getInput().next(null, value);
-    if (!hasNext) { return false; }
-
-    // Skip nulls
-    if (value == null) { return true; }
-
-    sc.getIncomingEntry().setTuple(new Tuple(value.get()));
-    return true;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sc)
-      throws IOException {
-    TupleEntry tuple = sc.getOutgoingEntry();
-
-    if (tuple.size() != 1) {
-      throw new RuntimeException("ParquetValueScheme expects tuples with an arity of exactly 1, but found " + tuple.getFields());
-    }
-
-    T value = (T) tuple.getObject(0);
-    OutputCollector output = sc.getOutput();
-    output.collect(null, value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/SchemaIntersection.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/SchemaIntersection.java b/parquet-cascading/src/main/java/parquet/cascading/SchemaIntersection.java
deleted file mode 100644
index a6623fb..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/SchemaIntersection.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import cascading.tuple.Fields;
-
-import java.util.List;
-import java.util.ArrayList;
-
-public class SchemaIntersection {
-
-  private final MessageType requestedSchema;
-  private final Fields sourceFields;
-
-  public SchemaIntersection(MessageType fileSchema, Fields requestedFields) {
-    if(requestedFields == Fields.UNKNOWN)
-      requestedFields = Fields.ALL;
-
-    Fields newFields = Fields.NONE;
-    List<Type> newSchemaFields = new ArrayList<Type>();
-    int schemaSize = fileSchema.getFieldCount();
-
-    for (int i = 0; i < schemaSize; i++) {
-      Type type = fileSchema.getType(i);
-      Fields name = new Fields(type.getName());
-
-      if(requestedFields.contains(name)) {
-        newFields = newFields.append(name);
-        newSchemaFields.add(type);
-      }
-    }
-
-    this.sourceFields = newFields;
-    this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields);
-  }
-
-  public MessageType getRequestedSchema() {
-    return requestedSchema;
-  }
-
-  public Fields getSourceFields() {
-    return sourceFields;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/TupleReadSupport.java b/parquet-cascading/src/main/java/parquet/cascading/TupleReadSupport.java
deleted file mode 100644
index 7c300b1..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/TupleReadSupport.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import java.util.Map;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.commons.lang.StringUtils;
-
-import cascading.tuple.Tuple;
-import cascading.tuple.Fields;
-import cascading.flow.hadoop.util.HadoopUtil;
-
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-import parquet.cascading.convert.TupleRecordMaterializer;
-
-
-public class TupleReadSupport extends ReadSupport<Tuple> {
-  static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields";
-
-  static protected Fields getRequestedFields(Configuration configuration) {
-    String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS);
-
-    if(fieldsString == null)
-      return Fields.ALL;
-
-    String[] parts = StringUtils.split(fieldsString, ":");
-    if(parts.length == 0)
-      return Fields.ALL;
-    else
-      return new Fields(parts);
-  }
-
-  static protected void setRequestedFields(JobConf configuration, Fields fields) {
-    String fieldsString = StringUtils.join(fields.iterator(), ":");
-    configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString);
-  }
-
-  @Override
-  public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
-    Fields requestedFields = getRequestedFields(configuration);
-    if (requestedFields == null) {
-      return new ReadContext(fileSchema);
-    } else {
-      SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields);
-      return new ReadContext(intersection.getRequestedSchema());
-    }
-  }
-
-  @Override
-  public RecordMaterializer<Tuple> prepareForRead(
-      Configuration configuration,
-      Map<String, String> keyValueMetaData,
-      MessageType fileSchema,
-      ReadContext readContext) {
-    MessageType requestedSchema = readContext.getRequestedSchema();
-    return new TupleRecordMaterializer(requestedSchema);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java b/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java
deleted file mode 100644
index bf87dcd..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/TupleWriteSupport.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import cascading.tuple.TupleEntry;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Type;
-
-/**
- *
- *
- * @author Mickaël Lacour <m.lacour@criteo.com>
- */
-public class TupleWriteSupport extends WriteSupport<TupleEntry> {
-
-  private RecordConsumer recordConsumer;
-  private MessageType rootSchema;
-  public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
-
-  @Override
-  public WriteContext init(Configuration configuration) {
-    String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
-    rootSchema = MessageTypeParser.parseMessageType(schema);
-    return new WriteContext(rootSchema, new HashMap<String, String>());
-  }
-
-  @Override
-  public void prepareForWrite(RecordConsumer recordConsumer) {
-    this.recordConsumer = recordConsumer;
-  }
-
-  @Override
-  public void write(TupleEntry record) {
-    recordConsumer.startMessage();
-    final List<Type> fields = rootSchema.getFields();
-
-    for (int i = 0; i < fields.size(); i++) {
-      Type field = fields.get(i);
-
-      if (record == null || record.getObject(field.getName()) == null) {
-        continue;
-      }
-      recordConsumer.startField(field.getName(), i);
-      if (field.isPrimitive()) {
-        writePrimitive(record, field.asPrimitiveType());
-      } else {
-        throw new UnsupportedOperationException("Complex type not implemented");
-      }
-      recordConsumer.endField(field.getName(), i);
-    }
-    recordConsumer.endMessage();
-  }
-
-  private void writePrimitive(TupleEntry record, PrimitiveType field) {
-    switch (field.getPrimitiveTypeName()) {
-      case BINARY:
-        recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
-        break;
-      case BOOLEAN:
-        recordConsumer.addBoolean(record.getBoolean(field.getName()));
-        break;
-      case INT32:
-        recordConsumer.addInteger(record.getInteger(field.getName()));
-        break;
-      case INT64:
-        recordConsumer.addLong(record.getLong(field.getName()));
-        break;
-      case DOUBLE:
-        recordConsumer.addDouble(record.getDouble(field.getName()));
-        break;
-      case FLOAT:
-        recordConsumer.addFloat(record.getFloat(field.getName()));
-        break;
-      case FIXED_LEN_BYTE_ARRAY:
-        throw new UnsupportedOperationException("Fixed len byte array type not implemented");
-      case INT96:
-        throw new UnsupportedOperationException("Int96 type not implemented");
-      default:
-        throw new UnsupportedOperationException(field.getName() + " type not implemented");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/convert/TupleConverter.java b/parquet-cascading/src/main/java/parquet/cascading/convert/TupleConverter.java
deleted file mode 100644
index 6b40158..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/convert/TupleConverter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading.convert;
-
-import cascading.tuple.Tuple;
-
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.Binary;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.pig.TupleConversionException;
-import parquet.schema.GroupType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Type;
-import parquet.schema.Type.Repetition;
-
-public class TupleConverter extends GroupConverter {
-
-  protected Tuple currentTuple;
-  private final Converter[] converters;
-
-  public TupleConverter(GroupType parquetSchema) {
-    int schemaSize = parquetSchema.getFieldCount();
-
-    this.converters = new Converter[schemaSize];
-    for (int i = 0; i < schemaSize; i++) {
-      Type type = parquetSchema.getType(i);
-      converters[i] = newConverter(type, i);
-    }
-  }
-
-  private Converter newConverter(Type type, int i) {
-    if(!type.isPrimitive()) {
-      throw new IllegalArgumentException("cascading can only build tuples from primitive types");
-    } else {
-      return new TuplePrimitiveConverter(this, i);
-    }
-  }
-
-  @Override
-  public Converter getConverter(int fieldIndex) {
-    return converters[fieldIndex];
-  }
-
-  @Override
-  final public void start() {
-    currentTuple = Tuple.size(converters.length);
-  }
-
-  @Override
-  public void end() {
-  }
-
-  final public Tuple getCurrentTuple() {
-    return currentTuple;
-  }
-
-  static final class TuplePrimitiveConverter extends PrimitiveConverter {
-    private final TupleConverter parent;
-    private final int index;
-
-    public TuplePrimitiveConverter(TupleConverter parent, int index) {
-      this.parent = parent;
-      this.index = index;
-    }
-
-    @Override
-    public void addBinary(Binary value) {
-      parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
-    }
-
-    @Override
-    public void addBoolean(boolean value) {
-      parent.getCurrentTuple().setBoolean(index, value);
-    }
-
-    @Override
-    public void addDouble(double value) {
-      parent.getCurrentTuple().setDouble(index, value);
-    }
-
-    @Override
-    public void addFloat(float value) {
-      parent.getCurrentTuple().setFloat(index, value);
-    }
-
-    @Override
-    public void addInt(int value) {
-      parent.getCurrentTuple().setInteger(index, value);
-    }
-
-    @Override
-    public void addLong(long value) {
-      parent.getCurrentTuple().setLong(index, value);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/main/java/parquet/cascading/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading/src/main/java/parquet/cascading/convert/TupleRecordMaterializer.java
deleted file mode 100644
index e48aca5..0000000
--- a/parquet-cascading/src/main/java/parquet/cascading/convert/TupleRecordMaterializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading.convert;
-
-import cascading.tuple.Tuple;
-import cascading.tuple.Fields;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-
-public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
-
-  private TupleConverter root;
-
-  public TupleRecordMaterializer(GroupType parquetSchema) {
-    this.root = new TupleConverter(parquetSchema);
-  }
-
-  @Override
-  public Tuple getCurrentRecord() {
-    return root.getCurrentTuple();
-  }
-
-  @Override
-  public GroupConverter getRootConverter() {
-    return root;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
new file mode 100644
index 0000000..841314c
--- /dev/null
+++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.thrift.test.Name;
+
+import java.io.File;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestParquetTBaseScheme {
+  final String txtInputPath = "src/test/resources/names.txt";
+  final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
+  final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
+  final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";
+
+  @Test
+  public void testWrite() throws Exception {
+    Path path = new Path(parquetOutputPath);
+    JobConf jobConf = new JobConf();
+    final FileSystem fs = path.getFileSystem(jobConf);
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
+    Tap source = new Hfs(sourceScheme, txtInputPath);
+
+    Scheme sinkScheme = new ParquetTBaseScheme(Name.class);
+    Tap sink = new Hfs(sinkScheme, parquetOutputPath);
+
+    Pipe assembly = new Pipe( "namecp" );
+    assembly = new Each(assembly, new PackThriftFunction());
+    HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
+    Flow flow  = hadoopFlowConnector.connect("namecp", source, sink, assembly);
+
+    flow.complete();
+
+    assertTrue(fs.exists(new Path(parquetOutputPath)));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    doRead(new ParquetTBaseScheme(Name.class));
+  }
+
+  @Test
+  public void testReadWithoutClass() throws Exception {
+    doRead(new ParquetTBaseScheme());
+  }
+
+  private void doRead(Scheme sourceScheme) throws Exception {
+    createFileForRead();
+
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Tap source = new Hfs(sourceScheme, parquetInputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe( "namecp" );
+    assembly = new Each(assembly, new UnpackThriftFunction());
+    Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
+    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
+  }
+
+
+  private void createFileForRead() throws Exception {
+    final Path fileToCreate = new Path(parquetInputPath+"/names.parquet");
+
+    final Configuration conf = new Configuration();
+    final FileSystem fs = fileToCreate.getFileSystem(conf);
+    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
+
+    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+
+    Name n1 = new Name();
+    n1.setFirst_name("Alice");
+    n1.setLast_name("Practice");
+    Name n2 = new Name();
+    n2.setFirst_name("Bob");
+    n2.setLast_name("Hope");
+    Name n3 = new Name();
+    n3.setFirst_name("Charlie");
+    n3.setLast_name("Horse");
+
+    n1.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n2.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n3.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    w.close();
+  }
+
+  private static class PackThriftFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Name name = new Name();
+      name.setFirst_name(arguments.getString(0));
+      name.setLast_name(arguments.getString(1));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+
+  private static class UnpackThriftFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Name name = (Name) arguments.get(0);
+      result.add(name.getFirst_name());
+      result.add(name.getLast_name());
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+}


Mime
View raw message