parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject [2/2] parquet-mr git commit: PARQUET-480: Update for Cascading 3.0
Date Mon, 01 Feb 2016 03:21:56 GMT
PARQUET-480: Update for Cascading 3.0

The code in parquet-cascading is adapted to the API as of Cascading 2.5.3

Some incompatible changes were introduced in Cascading 3.0. This patch forks the parquet-cascading module to also provide a parquet-cascading3 module, which is about identical save for overloads which changed from requiring a Foo<JobConf> to requiring a Foo<? extends JobConf>

Author: Cyrille Chépélov (TP12) <cch@transparencyrights.com>

Closes #284 from cchepelov/try_cascading3 and squashes the following commits:

e7d1304 [Cyrille Chépélov (TP12)] Adding a @Deprecated notice on parquet-cascading's remaining classes
05a417d [Cyrille Chépélov (TP12)] cascading2/3: share back TupleWriteSupport.java (accidentally unmerged)
7fff2d4 [Cyrille Chépélov (TP12)] cascading/cascading3: remove duplicates, push common files into parquet-cascading-common23
338a416 [Cyrille Chépélov (TP12)] Removing unwanted file (what?!) + .gitignoring this kind of files
d9f0455 [Cyrille Chépélov (TP12)] TupleEntry#get is now TupleEntry#getObject
a7f490a [Cyrille Chépélov (TP12)] Revert "Missing test conversion to Cascading 3.0"
cc8b870 [Cyrille Chépélov (TP12)] Missing test conversion to Cascading 3.0
2d73512 [Cyrille Chépélov (TP12)] conflicting values can come in one order or the other. Accept both.
33355d5 [Cyrille Chépélov (TP12)] Fix version mismatch (duh!)
7128639 [Cyrille Chépélov (TP12)] non-C locale can break tests implementation (decimal formats)
53aa2f9 [Cyrille Chépélov (TP12)] Adding a parquet-cascading3 module (forking the parquet-cascading module and accounting for API changes)


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/57694790
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/57694790
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/57694790

Branch: refs/heads/master
Commit: 57694790f8ca0e1a4f3ac76fbd25a6dd13041e03
Parents: af9fd05
Author: Cyrille Chépélov (TP12) <cch@transparencyrights.com>
Authored: Sun Jan 31 19:21:48 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Sun Jan 31 19:21:48 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 README.md                                       |   2 +-
 .../parquet/cascading/SchemaIntersection.java   |  63 ++++++
 .../parquet/cascading/TupleReadSupport.java     |  80 ++++++++
 .../parquet/cascading/TupleWriteSupport.java    | 111 +++++++++++
 .../cascading/convert/TupleConverter.java       | 115 +++++++++++
 .../convert/TupleRecordMaterializer.java        |  46 +++++
 .../cascading/TestParquetTupleScheme.java       | 182 ++++++++++++++++++
 .../src/test/resources/names.txt                |   3 +
 .../src/test/thrift/test.thrift                 |  25 +++
 parquet-cascading/pom.xml                       |  47 +++++
 .../parquet/cascading/ParquetTBaseScheme.java   |   1 +
 .../parquet/cascading/ParquetTupleScheme.java   |   1 +
 .../parquet/cascading/ParquetValueScheme.java   |   1 +
 .../parquet/cascading/SchemaIntersection.java   |  63 ------
 .../parquet/cascading/TupleReadSupport.java     |  80 --------
 .../parquet/cascading/TupleWriteSupport.java    | 111 -----------
 .../cascading/convert/TupleConverter.java       | 115 -----------
 .../convert/TupleRecordMaterializer.java        |  46 -----
 .../cascading/TestParquetTBaseScheme.java       |   3 +-
 .../cascading/TestParquetTupleScheme.java       | 182 ------------------
 parquet-cascading/src/test/resources/names.txt  |   3 -
 parquet-cascading/src/test/thrift/test.thrift   |  25 ---
 parquet-cascading3/REVIEWERS.md                 |  27 +++
 parquet-cascading3/pom.xml                      | 178 +++++++++++++++++
 .../parquet/cascading/ParquetTBaseScheme.java   |  80 ++++++++
 .../parquet/cascading/ParquetTupleScheme.java   | 191 +++++++++++++++++++
 .../parquet/cascading/ParquetValueScheme.java   | 184 ++++++++++++++++++
 .../cascading/TestParquetTBaseScheme.java       | 186 ++++++++++++++++++
 .../parquet/hadoop/TestMergeMetadataFiles.java  |  10 +-
 parquet_cascading.md                            |  13 ++
 pom.xml                                         |  26 +++
 32 files changed, 1574 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index cd3c066..aa67d3d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,6 @@ dependency-reduced-pom.xml
 parquet-scrooge/.cache
 .idea/*
 target/
+.cache
+*~
+mvn_install.log

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 2d9a50a..9bb0be6 100644
--- a/README.md
+++ b/README.md
@@ -62,7 +62,7 @@ sudo make install
 Once protobuf and thrift are available in your path, you can build the project by running:
 
 ```
-mvn clean install
+LC_ALL=C mvn clean install
 ```
 
 ## Features

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
new file mode 100644
index 0000000..e3fc3f7
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
@@ -0,0 +1,63 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import cascading.tuple.Fields;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class SchemaIntersection {
+
+  private final MessageType requestedSchema;
+  private final Fields sourceFields;
+
+  public SchemaIntersection(MessageType fileSchema, Fields requestedFields) {
+    if(requestedFields == Fields.UNKNOWN)
+      requestedFields = Fields.ALL;
+
+    Fields newFields = Fields.NONE;
+    List<Type> newSchemaFields = new ArrayList<Type>();
+    int schemaSize = fileSchema.getFieldCount();
+
+    for (int i = 0; i < schemaSize; i++) {
+      Type type = fileSchema.getType(i);
+      Fields name = new Fields(type.getName());
+
+      if(requestedFields.contains(name)) {
+        newFields = newFields.append(name);
+        newSchemaFields.add(type);
+      }
+    }
+
+    this.sourceFields = newFields;
+    this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields);
+  }
+
+  public MessageType getRequestedSchema() {
+    return requestedSchema;
+  }
+
+  public Fields getSourceFields() {
+    return sourceFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
new file mode 100644
index 0000000..42a5926
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
@@ -0,0 +1,80 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.commons.lang.StringUtils;
+
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+import cascading.flow.hadoop.util.HadoopUtil;
+
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.cascading.convert.TupleRecordMaterializer;
+
+
+public class TupleReadSupport extends ReadSupport<Tuple> {
+  static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields";
+
+  static protected Fields getRequestedFields(Configuration configuration) {
+    String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS);
+
+    if(fieldsString == null)
+      return Fields.ALL;
+
+    String[] parts = StringUtils.split(fieldsString, ":");
+    if(parts.length == 0)
+      return Fields.ALL;
+    else
+      return new Fields(parts);
+  }
+
+  static protected void setRequestedFields(JobConf configuration, Fields fields) {
+    String fieldsString = StringUtils.join(fields.iterator(), ":");
+    configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString);
+  }
+
+  @Override
+  public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
+    Fields requestedFields = getRequestedFields(configuration);
+    if (requestedFields == null) {
+      return new ReadContext(fileSchema);
+    } else {
+      SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields);
+      return new ReadContext(intersection.getRequestedSchema());
+    }
+  }
+
+  @Override
+  public RecordMaterializer<Tuple> prepareForRead(
+      Configuration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadContext readContext) {
+    MessageType requestedSchema = readContext.getRequestedSchema();
+    return new TupleRecordMaterializer(requestedSchema);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
new file mode 100644
index 0000000..032f534
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
@@ -0,0 +1,111 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.tuple.TupleEntry;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+/**
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ */
+public class TupleWriteSupport extends WriteSupport<TupleEntry> {
+
+  private RecordConsumer recordConsumer;
+  private MessageType rootSchema;
+  public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
+
+  @Override
+  public String getName() {
+    return "cascading";
+  }
+
+  @Override
+  public WriteContext init(Configuration configuration) {
+    String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
+    rootSchema = MessageTypeParser.parseMessageType(schema);
+    return new WriteContext(rootSchema, new HashMap<String, String>());
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    this.recordConsumer = recordConsumer;
+  }
+
+  @Override
+  public void write(TupleEntry record) {
+    recordConsumer.startMessage();
+    final List<Type> fields = rootSchema.getFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+      Type field = fields.get(i);
+
+      if (record == null || record.getObject(field.getName()) == null) {
+        continue;
+      }
+      recordConsumer.startField(field.getName(), i);
+      if (field.isPrimitive()) {
+        writePrimitive(record, field.asPrimitiveType());
+      } else {
+        throw new UnsupportedOperationException("Complex type not implemented");
+      }
+      recordConsumer.endField(field.getName(), i);
+    }
+    recordConsumer.endMessage();
+  }
+
+  private void writePrimitive(TupleEntry record, PrimitiveType field) {
+    switch (field.getPrimitiveTypeName()) {
+      case BINARY:
+        recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
+        break;
+      case BOOLEAN:
+        recordConsumer.addBoolean(record.getBoolean(field.getName()));
+        break;
+      case INT32:
+        recordConsumer.addInteger(record.getInteger(field.getName()));
+        break;
+      case INT64:
+        recordConsumer.addLong(record.getLong(field.getName()));
+        break;
+      case DOUBLE:
+        recordConsumer.addDouble(record.getDouble(field.getName()));
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(record.getFloat(field.getName()));
+        break;
+      case FIXED_LEN_BYTE_ARRAY:
+        throw new UnsupportedOperationException("Fixed len byte array type not implemented");
+      case INT96:
+        throw new UnsupportedOperationException("Int96 type not implemented");
+      default:
+        throw new UnsupportedOperationException(field.getName() + " type not implemented");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
new file mode 100644
index 0000000..3741165
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
@@ -0,0 +1,115 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading.convert;
+
+import cascading.tuple.Tuple;
+
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.pig.TupleConversionException;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+
+public class TupleConverter extends GroupConverter {
+
+  protected Tuple currentTuple;
+  private final Converter[] converters;
+
+  public TupleConverter(GroupType parquetSchema) {
+    int schemaSize = parquetSchema.getFieldCount();
+
+    this.converters = new Converter[schemaSize];
+    for (int i = 0; i < schemaSize; i++) {
+      Type type = parquetSchema.getType(i);
+      converters[i] = newConverter(type, i);
+    }
+  }
+
+  private Converter newConverter(Type type, int i) {
+    if(!type.isPrimitive()) {
+      throw new IllegalArgumentException("cascading can only build tuples from primitive types");
+    } else {
+      return new TuplePrimitiveConverter(this, i);
+    }
+  }
+
+  @Override
+  public Converter getConverter(int fieldIndex) {
+    return converters[fieldIndex];
+  }
+
+  @Override
+  final public void start() {
+    currentTuple = Tuple.size(converters.length);
+  }
+
+  @Override
+  public void end() {
+  }
+
+  final public Tuple getCurrentTuple() {
+    return currentTuple;
+  }
+
+  static final class TuplePrimitiveConverter extends PrimitiveConverter {
+    private final TupleConverter parent;
+    private final int index;
+
+    public TuplePrimitiveConverter(TupleConverter parent, int index) {
+      this.parent = parent;
+      this.index = index;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      parent.getCurrentTuple().setBoolean(index, value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+      parent.getCurrentTuple().setDouble(index, value);
+    }
+
+    @Override
+    public void addFloat(float value) {
+      parent.getCurrentTuple().setFloat(index, value);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.getCurrentTuple().setInteger(index, value);
+    }
+
+    @Override
+    public void addLong(long value) {
+      parent.getCurrentTuple().setLong(index, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
new file mode 100644
index 0000000..275e17b
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
@@ -0,0 +1,46 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading.convert;
+
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+
+public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
+
+  private TupleConverter root;
+
+  public TupleRecordMaterializer(GroupType parquetSchema) {
+    this.root = new TupleConverter(parquetSchema);
+  }
+
+  @Override
+  public Tuple getCurrentRecord() {
+    return root.getCurrentTuple();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
new file mode 100644
index 0000000..de350dd
--- /dev/null
+++ b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
@@ -0,0 +1,182 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Test;
+import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.thrift.test.Name;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetTupleScheme {
+  final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
+  final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
+
+  @Test
+  public void testReadPattern() throws Exception {
+    String sourceFolder = parquetInputPath;
+    testReadWrite(sourceFolder);
+
+    String sourceGlobPattern = parquetInputPath + "/*";
+    testReadWrite(sourceGlobPattern);
+
+    String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
+    testReadWrite(multiLevelGlobPattern);
+  }
+
+  @Test
+  public void testFieldProjection() throws Exception {
+    createFileForRead();
+
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
+    Tap source = new Hfs(sourceScheme, parquetInputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("last_name"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe("namecp");
+    assembly = new Each(assembly, new ProjectedTupleFunction());
+    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+    assertEquals("Practice\nHope\nHorse\n", result);
+  }
+
+  public void testReadWrite(String inputPath) throws Exception {
+    createFileForRead();
+
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
+    Tap source = new Hfs(sourceScheme, inputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe("namecp");
+    assembly = new Each(assembly, new UnpackTupleFunction());
+    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
+  }
+
+  private void createFileForRead() throws Exception {
+    final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
+
+    final Configuration conf = new Configuration();
+    final FileSystem fs = fileToCreate.getFileSystem(conf);
+    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
+
+    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+
+    Name n1 = new Name();
+    n1.setFirst_name("Alice");
+    n1.setLast_name("Practice");
+    Name n2 = new Name();
+    n2.setFirst_name("Bob");
+    n2.setLast_name("Hope");
+    Name n3 = new Name();
+    n3.setFirst_name("Charlie");
+    n3.setLast_name("Horse");
+
+    n1.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n2.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n3.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    w.close();
+  }
+
+  private static class UnpackTupleFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Tuple name = new Tuple();
+      name.addString(arguments.getString(0));
+      name.addString(arguments.getString(1));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+
+  private static class ProjectedTupleFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Tuple name = new Tuple();
+      name.addString(arguments.getString(0));
+//      name.addString(arguments.getString(1));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/resources/names.txt
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/test/resources/names.txt b/parquet-cascading-common23/src/test/resources/names.txt
new file mode 100644
index 0000000..e2d0408
--- /dev/null
+++ b/parquet-cascading-common23/src/test/resources/names.txt
@@ -0,0 +1,3 @@
+Alice	Practive
+Bob	Hope
+Charlie	Horse

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/test/thrift/test.thrift b/parquet-cascading-common23/src/test/thrift/test.thrift
new file mode 100644
index 0000000..c58843d
--- /dev/null
+++ b/parquet-cascading-common23/src/test/thrift/test.thrift
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace java org.apache.parquet.thrift.test
+
+struct Name {
+  1: required string first_name,
+  2: optional string last_name
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml
index 0cd8588..cabb003 100644
--- a/parquet-cascading/pom.xml
+++ b/parquet-cascading/pom.xml
@@ -103,6 +103,51 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.7</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>../parquet-cascading-common23/src/main/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-source</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>../parquet-cascading-common23/src/test/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-resource</id>
+            <phase>generate-test-resources</phase>
+            <goals>
+              <goal>add-test-resource</goal>
+            </goals>
+            <configuration>
+              <resources>
+                <resource>
+                  <directory>../parquet-cascading-common23/src/test/resources</directory>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <artifactId>maven-enforcer-plugin</artifactId>
       </plugin>
       <plugin>
@@ -115,6 +160,8 @@
         <version>0.1.10</version>
         <configuration>
           <thriftExecutable>${thrift.executable}</thriftExecutable>
+          <thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot>
+          <thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot>
         </configuration>
         <executions>
           <execution>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
index ea70d43..b34ee7d 100644
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
@@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
 import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
 import org.apache.parquet.thrift.TBaseRecordConverter;
 
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
 public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
 
   // In the case of reads, we can read the thrift class from the file metadata

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
index 41b56d0..3b7d715 100644
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
@@ -59,6 +59,7 @@ import static org.apache.parquet.Preconditions.checkNotNull;
   * @author Avi Bryant
   */
 
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
 public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
 
   private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
index 9549ef4..6c34a84 100644
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
@@ -47,6 +47,7 @@ import static org.apache.parquet.Preconditions.checkNotNull;
  * This is an abstract class; implementations are expected to set up their Input/Output Formats
  * correctly in the respective Init methods.
  */
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
 public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
 
   public static final class Config<T> implements Serializable {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
deleted file mode 100644
index e3fc3f7..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
-
-import cascading.tuple.Fields;
-
-import java.util.List;
-import java.util.ArrayList;
-
-public class SchemaIntersection {
-
-  private final MessageType requestedSchema;
-  private final Fields sourceFields;
-
-  public SchemaIntersection(MessageType fileSchema, Fields requestedFields) {
-    if(requestedFields == Fields.UNKNOWN)
-      requestedFields = Fields.ALL;
-
-    Fields newFields = Fields.NONE;
-    List<Type> newSchemaFields = new ArrayList<Type>();
-    int schemaSize = fileSchema.getFieldCount();
-
-    for (int i = 0; i < schemaSize; i++) {
-      Type type = fileSchema.getType(i);
-      Fields name = new Fields(type.getName());
-
-      if(requestedFields.contains(name)) {
-        newFields = newFields.append(name);
-        newSchemaFields.add(type);
-      }
-    }
-
-    this.sourceFields = newFields;
-    this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields);
-  }
-
-  public MessageType getRequestedSchema() {
-    return requestedSchema;
-  }
-
-  public Fields getSourceFields() {
-    return sourceFields;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
deleted file mode 100644
index 42a5926..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import java.util.Map;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.commons.lang.StringUtils;
-
-import cascading.tuple.Tuple;
-import cascading.tuple.Fields;
-import cascading.flow.hadoop.util.HadoopUtil;
-
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.cascading.convert.TupleRecordMaterializer;
-
-
-public class TupleReadSupport extends ReadSupport<Tuple> {
-  static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields";
-
-  static protected Fields getRequestedFields(Configuration configuration) {
-    String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS);
-
-    if(fieldsString == null)
-      return Fields.ALL;
-
-    String[] parts = StringUtils.split(fieldsString, ":");
-    if(parts.length == 0)
-      return Fields.ALL;
-    else
-      return new Fields(parts);
-  }
-
-  static protected void setRequestedFields(JobConf configuration, Fields fields) {
-    String fieldsString = StringUtils.join(fields.iterator(), ":");
-    configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString);
-  }
-
-  @Override
-  public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
-    Fields requestedFields = getRequestedFields(configuration);
-    if (requestedFields == null) {
-      return new ReadContext(fileSchema);
-    } else {
-      SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields);
-      return new ReadContext(intersection.getRequestedSchema());
-    }
-  }
-
-  @Override
-  public RecordMaterializer<Tuple> prepareForRead(
-      Configuration configuration,
-      Map<String, String> keyValueMetaData,
-      MessageType fileSchema,
-      ReadContext readContext) {
-    MessageType requestedSchema = readContext.getRequestedSchema();
-    return new TupleRecordMaterializer(requestedSchema);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
deleted file mode 100644
index 032f534..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import cascading.tuple.TupleEntry;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-/**
- *
- *
- * @author Mickaël Lacour <m.lacour@criteo.com>
- */
-public class TupleWriteSupport extends WriteSupport<TupleEntry> {
-
-  private RecordConsumer recordConsumer;
-  private MessageType rootSchema;
-  public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
-
-  @Override
-  public String getName() {
-    return "cascading";
-  }
-
-  @Override
-  public WriteContext init(Configuration configuration) {
-    String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
-    rootSchema = MessageTypeParser.parseMessageType(schema);
-    return new WriteContext(rootSchema, new HashMap<String, String>());
-  }
-
-  @Override
-  public void prepareForWrite(RecordConsumer recordConsumer) {
-    this.recordConsumer = recordConsumer;
-  }
-
-  @Override
-  public void write(TupleEntry record) {
-    recordConsumer.startMessage();
-    final List<Type> fields = rootSchema.getFields();
-
-    for (int i = 0; i < fields.size(); i++) {
-      Type field = fields.get(i);
-
-      if (record == null || record.getObject(field.getName()) == null) {
-        continue;
-      }
-      recordConsumer.startField(field.getName(), i);
-      if (field.isPrimitive()) {
-        writePrimitive(record, field.asPrimitiveType());
-      } else {
-        throw new UnsupportedOperationException("Complex type not implemented");
-      }
-      recordConsumer.endField(field.getName(), i);
-    }
-    recordConsumer.endMessage();
-  }
-
-  private void writePrimitive(TupleEntry record, PrimitiveType field) {
-    switch (field.getPrimitiveTypeName()) {
-      case BINARY:
-        recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
-        break;
-      case BOOLEAN:
-        recordConsumer.addBoolean(record.getBoolean(field.getName()));
-        break;
-      case INT32:
-        recordConsumer.addInteger(record.getInteger(field.getName()));
-        break;
-      case INT64:
-        recordConsumer.addLong(record.getLong(field.getName()));
-        break;
-      case DOUBLE:
-        recordConsumer.addDouble(record.getDouble(field.getName()));
-        break;
-      case FLOAT:
-        recordConsumer.addFloat(record.getFloat(field.getName()));
-        break;
-      case FIXED_LEN_BYTE_ARRAY:
-        throw new UnsupportedOperationException("Fixed len byte array type not implemented");
-      case INT96:
-        throw new UnsupportedOperationException("Int96 type not implemented");
-      default:
-        throw new UnsupportedOperationException(field.getName() + " type not implemented");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
deleted file mode 100644
index 3741165..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading.convert;
-
-import cascading.tuple.Tuple;
-
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.pig.TupleConversionException;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-import org.apache.parquet.schema.Type.Repetition;
-
-public class TupleConverter extends GroupConverter {
-
-  protected Tuple currentTuple;
-  private final Converter[] converters;
-
-  public TupleConverter(GroupType parquetSchema) {
-    int schemaSize = parquetSchema.getFieldCount();
-
-    this.converters = new Converter[schemaSize];
-    for (int i = 0; i < schemaSize; i++) {
-      Type type = parquetSchema.getType(i);
-      converters[i] = newConverter(type, i);
-    }
-  }
-
-  private Converter newConverter(Type type, int i) {
-    if(!type.isPrimitive()) {
-      throw new IllegalArgumentException("cascading can only build tuples from primitive types");
-    } else {
-      return new TuplePrimitiveConverter(this, i);
-    }
-  }
-
-  @Override
-  public Converter getConverter(int fieldIndex) {
-    return converters[fieldIndex];
-  }
-
-  @Override
-  final public void start() {
-    currentTuple = Tuple.size(converters.length);
-  }
-
-  @Override
-  public void end() {
-  }
-
-  final public Tuple getCurrentTuple() {
-    return currentTuple;
-  }
-
-  static final class TuplePrimitiveConverter extends PrimitiveConverter {
-    private final TupleConverter parent;
-    private final int index;
-
-    public TuplePrimitiveConverter(TupleConverter parent, int index) {
-      this.parent = parent;
-      this.index = index;
-    }
-
-    @Override
-    public void addBinary(Binary value) {
-      parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
-    }
-
-    @Override
-    public void addBoolean(boolean value) {
-      parent.getCurrentTuple().setBoolean(index, value);
-    }
-
-    @Override
-    public void addDouble(double value) {
-      parent.getCurrentTuple().setDouble(index, value);
-    }
-
-    @Override
-    public void addFloat(float value) {
-      parent.getCurrentTuple().setFloat(index, value);
-    }
-
-    @Override
-    public void addInt(int value) {
-      parent.getCurrentTuple().setInteger(index, value);
-    }
-
-    @Override
-    public void addLong(long value) {
-      parent.getCurrentTuple().setLong(index, value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
deleted file mode 100644
index 275e17b..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading.convert;
-
-import cascading.tuple.Tuple;
-import cascading.tuple.Fields;
-
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.GroupType;
-
-public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
-
-  private TupleConverter root;
-
-  public TupleRecordMaterializer(GroupType parquetSchema) {
-    this.root = new TupleConverter(parquetSchema);
-  }
-
-  @Override
-  public Tuple getCurrentRecord() {
-    return root.getCurrentTuple();
-  }
-
-  @Override
-  public GroupConverter getRootConverter() {
-    return root;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
index 841314c..e0f33e1 100644
--- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
+++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
@@ -58,8 +58,9 @@ import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
 public class TestParquetTBaseScheme {
-  final String txtInputPath = "src/test/resources/names.txt";
+  final String txtInputPath = "target/test-classes/names.txt";
   final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
   final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
   final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
deleted file mode 100644
index de350dd..0000000
--- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowProcess;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.BaseOperation;
-import cascading.operation.Function;
-import cascading.operation.FunctionCall;
-import cascading.pipe.Each;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextLine;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
-import org.apache.parquet.hadoop.util.ContextUtil;
-import org.apache.parquet.thrift.test.Name;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestParquetTupleScheme {
-  final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
-  final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
-
-  @Test
-  public void testReadPattern() throws Exception {
-    String sourceFolder = parquetInputPath;
-    testReadWrite(sourceFolder);
-
-    String sourceGlobPattern = parquetInputPath + "/*";
-    testReadWrite(sourceGlobPattern);
-
-    String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
-    testReadWrite(multiLevelGlobPattern);
-  }
-
-  @Test
-  public void testFieldProjection() throws Exception {
-    createFileForRead();
-
-    Path path = new Path(txtOutputPath);
-    final FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) fs.delete(path, true);
-
-    Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
-    Tap source = new Hfs(sourceScheme, parquetInputPath);
-
-    Scheme sinkScheme = new TextLine(new Fields("last_name"));
-    Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
-    Pipe assembly = new Pipe("namecp");
-    assembly = new Each(assembly, new ProjectedTupleFunction());
-    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
-    flow.complete();
-    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
-    assertEquals("Practice\nHope\nHorse\n", result);
-  }
-
-  public void testReadWrite(String inputPath) throws Exception {
-    createFileForRead();
-
-    Path path = new Path(txtOutputPath);
-    final FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) fs.delete(path, true);
-
-    Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
-    Tap source = new Hfs(sourceScheme, inputPath);
-
-    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
-    Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
-    Pipe assembly = new Pipe("namecp");
-    assembly = new Each(assembly, new UnpackTupleFunction());
-    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
-    flow.complete();
-    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
-    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
-  }
-
-  private void createFileForRead() throws Exception {
-    final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
-
-    final Configuration conf = new Configuration();
-    final FileSystem fs = fileToCreate.getFileSystem(conf);
-    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
-
-    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
-    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
-    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
-
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
-    Name n1 = new Name();
-    n1.setFirst_name("Alice");
-    n1.setLast_name("Practice");
-    Name n2 = new Name();
-    n2.setFirst_name("Bob");
-    n2.setLast_name("Hope");
-    Name n3 = new Name();
-    n3.setFirst_name("Charlie");
-    n3.setLast_name("Horse");
-
-    n1.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    baos.reset();
-    n2.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    baos.reset();
-    n3.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    w.close();
-  }
-
-  private static class UnpackTupleFunction extends BaseOperation implements Function {
-    @Override
-    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
-      TupleEntry arguments = functionCall.getArguments();
-      Tuple result = new Tuple();
-
-      Tuple name = new Tuple();
-      name.addString(arguments.getString(0));
-      name.addString(arguments.getString(1));
-
-      result.add(name);
-      functionCall.getOutputCollector().add(result);
-    }
-  }
-
-  private static class ProjectedTupleFunction extends BaseOperation implements Function {
-    @Override
-    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
-      TupleEntry arguments = functionCall.getArguments();
-      Tuple result = new Tuple();
-
-      Tuple name = new Tuple();
-      name.addString(arguments.getString(0));
-//      name.addString(arguments.getString(1));
-
-      result.add(name);
-      functionCall.getOutputCollector().add(result);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/resources/names.txt
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/resources/names.txt b/parquet-cascading/src/test/resources/names.txt
deleted file mode 100644
index e2d0408..0000000
--- a/parquet-cascading/src/test/resources/names.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-Alice	Practive
-Bob	Hope
-Charlie	Horse

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading/src/test/thrift/test.thrift
deleted file mode 100644
index c58843d..0000000
--- a/parquet-cascading/src/test/thrift/test.thrift
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace java org.apache.parquet.thrift.test
-
-struct Name {
-  1: required string first_name,
-  2: optional string last_name
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/REVIEWERS.md
----------------------------------------------------------------------
diff --git a/parquet-cascading3/REVIEWERS.md b/parquet-cascading3/REVIEWERS.md
new file mode 100644
index 0000000..f797235
--- /dev/null
+++ b/parquet-cascading3/REVIEWERS.md
@@ -0,0 +1,27 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project:
+
+| Name               | Apache Id  | github id   |
+|--------------------|------------|-------------|
+| Dmitriy Ryaboy     | dvryaboy   | dvryaboy    |
+| Tianshuo Deng      | tianshuo   | tsdeng      |
+
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml
new file mode 100644
index 0000000..ea552ad
--- /dev/null
+++ b/parquet-cascading3/pom.xml
@@ -0,0 +1,178 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet</artifactId>
+    <relativePath>../pom.xml</relativePath>
+    <version>1.8.2-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>parquet-cascading3</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Parquet Cascading (for Cascading 3.0 onwards)</name>
+  <url>https://parquet.apache.org</url>
+
+  <repositories>
+    <repository>
+     <id>conjars.org</id>
+      <url>http://conjars.org/repo</url>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-thrift</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+       <groupId>cascading</groupId>
+       <artifactId>cascading-hadoop</artifactId> <!-- building against cascading-hadoop for Hadoop1, but will use against any backend -->
+       <version>${cascading3.version}</version>
+       <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+        <!-- TEMPORARY UNTIL AFTER previous.version &gt;= 1.8.2
+
+        (enforcer checks against the API in 1.7.0, this module did not exist back then, therefore it can't succeed)
+        -->
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>none</phase>
+          </execution>
+        </executions>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+        <!-- /TEMPORARY -->
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.7</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>../parquet-cascading-common23/src/main/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-source</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>../parquet-cascading-common23/src/test/java</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-resource</id>
+            <phase>generate-test-resources</phase>
+            <goals>
+              <goal>add-test-resource</goal>
+            </goals>
+            <configuration>
+              <resources>
+                <resource>
+                  <directory>../parquet-cascading-common23/src/test/resources</directory>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.thrift.tools</groupId>
+        <artifactId>maven-thrift-plugin</artifactId>
+        <version>0.1.10</version>
+        <configuration>
+          <thriftExecutable>${thrift.executable}</thriftExecutable>
+          <thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot>
+          <thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot>
+        </configuration>
+        <executions>
+          <execution>
+            <id>thrift-sources</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
new file mode 100644
index 0000000..af04b47
--- /dev/null
+++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
@@ -0,0 +1,80 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.thrift.TBase;
+
+import cascading.flow.FlowProcess;
+import cascading.tap.Tap;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
+import org.apache.parquet.thrift.TBaseRecordConverter;
+
+public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
+
+  // In the case of reads, we can read the thrift class from the file metadata
+  public ParquetTBaseScheme() {
+    this(new Config<T>());
+  }
+
+  public ParquetTBaseScheme(Class<T> thriftClass) {
+    this(new Config<T>().withRecordClass(thriftClass));
+  }
+
+  public ParquetTBaseScheme(FilterPredicate filterPredicate) {
+    this(new Config<T>().withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
+    this(new Config<T>().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetTBaseScheme(Config<T> config) {
+    super(config);
+  }
+
+  @Override
+  public void sourceConfInit(FlowProcess<? extends JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+    super.sourceConfInit(fp, tap, jobConf);
+    jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
+    ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
+    ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
+  }
+
+  @Override
+  public void sinkConfInit(FlowProcess<? extends JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+
+    if (this.config.getKlass() == null) {
+      throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
+    }
+
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
+    DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
+    TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
new file mode 100644
index 0000000..4532d3b
--- /dev/null
+++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
@@ -0,0 +1,191 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.parquet.cascading;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.CompositeTap;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+  * A Cascading Scheme that converts Parquet groups into Cascading tuples.
+  * If you provide it with sourceFields, it will selectively materialize only the columns for those fields.
+  * The names must match the names in the Parquet schema.
+  * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the
+  * Parquet schema.
+  * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be
+  * flattened to a top-level field in the Cascading tuple.
+  *
+  * @author Avi Bryant
+  */
+
+public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
+
+  private static final long serialVersionUID = 0L;
+  private String parquetSchema;
+  private final FilterPredicate filterPredicate;
+
+  public ParquetTupleScheme() {
+    super();
+    this.filterPredicate = null;
+  }
+
+  public ParquetTupleScheme(Fields sourceFields) {
+    super(sourceFields);
+    this.filterPredicate = null;
+  }
+
+  public ParquetTupleScheme(FilterPredicate filterPredicate) {
+    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+  }
+
+  public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) {
+    super(sourceFields);
+    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+  }
+
+  /**
+   * ParquetTupleScheme constructor used a sink need to be implemented
+   *
+   * @param sourceFields used for the reading step
+   * @param sinkFields used for the writing step
+   * @param schema is mandatory if you add sinkFields and needs to be the
+   * toString() from a MessageType. This value is going to be parsed when the
+   * parquet file will be created.
+   */
+  public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) {
+    super(sourceFields, sinkFields);
+    parquetSchema = schema;
+    this.filterPredicate = null;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void sourceConfInit(FlowProcess<? extends JobConf> fp,
+      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+
+    if (filterPredicate != null) {
+      ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate);
+    }
+
+    jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
+    ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class);
+    TupleReadSupport.setRequestedFields(jobConf, getSourceFields());
+ }
+
+ @Override
+ public Fields retrieveSourceFields(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
+    MessageType schema = readSchema(flowProcess, tap);
+    SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields());
+
+    setSourceFields(intersection.getSourceFields());
+
+    return getSourceFields();
+  }
+
+  private MessageType readSchema(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
+    try {
+      Hfs hfs;
+
+      if( tap instanceof CompositeTap )
+        hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next();
+      else
+        hfs = (Hfs) tap;
+
+      List<Footer> footers = getFooters(flowProcess, hfs);
+
+      if(footers.isEmpty()) {
+        throw new TapException("Could not read Parquet metadata at " + hfs.getPath());
+      } else {
+        return footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+      }
+    } catch (IOException e) {
+      throw new TapException(e);
+    }
+  }
+
+   private List<Footer> getFooters(FlowProcess<? extends JobConf> flowProcess, Hfs hfs) throws IOException {
+     JobConf jobConf = flowProcess.getConfigCopy();
+     DeprecatedParquetInputFormat format = new DeprecatedParquetInputFormat();
+     format.addInputPath(jobConf, hfs.getPath());
+     return format.getFooters(jobConf);
+   }
+
+   @SuppressWarnings("unchecked")
+  @Override
+  public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc)
+      throws IOException {
+    Container<Tuple> value = (Container<Tuple>) sc.getInput().createValue();
+    boolean hasNext = sc.getInput().next(null, value);
+    if (!hasNext) { return false; }
+
+    // Skip nulls
+    if (value == null) { return true; }
+
+    sc.getIncomingEntry().setTuple(value.get());
+    return true;
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void sinkConfInit(FlowProcess<? extends JobConf> fp,
+          Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
+    jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
+    ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
+  }
+
+  @Override
+  public boolean isSink() {
+    return parquetSchema != null;
+  }
+
+  @Override
+  public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sink)
+          throws IOException {
+    TupleEntry tuple = sink.getOutgoingEntry();
+    OutputCollector outputCollector = sink.getOutput();
+    outputCollector.collect(null, tuple);
+  }
+}


Mime
View raw message