parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject [1/2] parquet-mr git commit: PARQUET-480: Update for Cascading 3.0
Date Mon, 01 Feb 2016 03:21:55 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master af9fd052d -> 57694790f


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
new file mode 100644
index 0000000..28f7f32
--- /dev/null
+++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
@@ -0,0 +1,184 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.Tap;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.thrift.ParquetThriftInputFormat;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A Cascading Scheme that returns a simple Tuple with a single value, the "value" object
+ * coming out of the underlying InputFormat.
+ *
+ * This is an abstract class; implementations are expected to set up their Input/Output Formats
+ * correctly in the respective Init methods.
+ */
+public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader,
OutputCollector, Object[], Object[]>{
+
+  public static final class Config<T> implements Serializable {
+    private final FilterPredicate filterPredicate;
+    private final String deprecatedProjectionString;
+    private final String strictProjectionString;
+    private final Class<T> klass;
+
+    private Config(Class<T> klass, FilterPredicate filterPredicate, String deprecatedProjectionString,
String strictProjectionString) {
+      this.filterPredicate = filterPredicate;
+      this.deprecatedProjectionString = deprecatedProjectionString;
+      this.strictProjectionString = strictProjectionString;
+      this.klass = klass;
+    }
+
+    public Config() {
+      filterPredicate = null;
+      deprecatedProjectionString = null;
+      strictProjectionString = null;
+      klass = null;
+    }
+
+    public FilterPredicate getFilterPredicate() {
+      return filterPredicate;
+    }
+
+    @Deprecated
+    public String getProjectionString() {
+      return deprecatedProjectionString;
+    }
+
+    public String getStrictProjectionString() {
+      return strictProjectionString;
+    }
+
+    public Class<T> getKlass() {
+      return klass;
+    }
+
+    public Config<T> withFilterPredicate(FilterPredicate f) {
+      return new Config<T>(this.klass, checkNotNull(f, "filterPredicate"), this.deprecatedProjectionString,
this.strictProjectionString);
+    }
+
+    @Deprecated
+    public Config<T> withProjectionString(String p) {
+      return new Config<T>(this.klass, this.filterPredicate, checkNotNull(p, "projectionString"),
this.strictProjectionString);
+    }
+
+    public Config<T> withStrictProjectionString(String p) {
+      return new Config<T>(this.klass, this.filterPredicate, this.deprecatedProjectionString,
checkNotNull(p, "projectionString"));
+    }
+
+    public Config<T> withRecordClass(Class<T> klass) {
+      return new Config<T>(checkNotNull(klass, "recordClass"), this.filterPredicate,
this.deprecatedProjectionString, this.strictProjectionString);
+    }
+  }
+
+  private static final long serialVersionUID = 157560846420730043L;
+  protected final Config<T> config;
+
+  public ParquetValueScheme() {
+    this(new Config<T>());
+  }
+
+  public ParquetValueScheme(FilterPredicate filterPredicate) {
+    this(new Config<T>().withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetValueScheme(Config<T> config) {
+    this.config = config;
+  }
+
+  @Deprecated
+  private void setProjectionPushdown(JobConf jobConf) {
+    if (this.config.deprecatedProjectionString != null) {
+      ThriftReadSupport.setProjectionPushdown(jobConf, this.config.deprecatedProjectionString);
+    }
+  }
+
+  private void setStrictProjectionPushdown(JobConf jobConf) {
+    if (this.config.strictProjectionString != null) {
+      ThriftReadSupport.setStrictFieldProjectionFilter(jobConf, this.config.strictProjectionString);
+    }
+  }
+
+  private void setPredicatePushdown(JobConf jobConf) {
+    if (this.config.filterPredicate != null) {
+      ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate);
+    }
+  }
+  @Override
+  public void sourceConfInit(FlowProcess<? extends JobConf> jobConfFlowProcess, Tap<JobConf,
RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, JobConf jobConf)
{
+    setPredicatePushdown(jobConf);
+    setProjectionPushdown(jobConf);
+    setStrictProjectionPushdown(jobConf);
+    setRecordClass(jobConf);
+  }
+
+  private void setRecordClass(JobConf jobConf) {
+    if (config.klass != null) {
+      ParquetThriftInputFormat.setThriftClass(jobConf, config.klass);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[],
RecordReader> sc)
+      throws IOException {
+    Container<T> value = (Container<T>) sc.getInput().createValue();
+    boolean hasNext = sc.getInput().next(null, value);
+    if (!hasNext) { return false; }
+
+    // Skip nulls
+    if (value == null) { return true; }
+
+    sc.getIncomingEntry().setTuple(new Tuple(value.get()));
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector>
sc)
+      throws IOException {
+    TupleEntry tuple = sc.getOutgoingEntry();
+
+    if (tuple.size() != 1) {
+      throw new RuntimeException("ParquetValueScheme expects tuples with an arity of exactly
1, but found " + tuple.getFields());
+    }
+
+    T value = (T) tuple.getObject(0);
+    OutputCollector output = sc.getOutput();
+    output.collect(null, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
new file mode 100644
index 0000000..7b9f817
--- /dev/null
+++ b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.thrift.test.Name;
+
+import java.io.File;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestParquetTBaseScheme {
+  final String txtInputPath = "target/test-classes/names.txt";
+  final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
+  final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
+  final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";
+
+  @Test
+  public void testWrite() throws Exception {
+    Path path = new Path(parquetOutputPath);
+    JobConf jobConf = new JobConf();
+    final FileSystem fs = path.getFileSystem(jobConf);
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
+    Tap source = new Hfs(sourceScheme, txtInputPath);
+
+    Scheme sinkScheme = new ParquetTBaseScheme(Name.class);
+    Tap sink = new Hfs(sinkScheme, parquetOutputPath);
+
+    Pipe assembly = new Pipe( "namecp" );
+    assembly = new Each(assembly, new PackThriftFunction());
+    HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
+    Flow flow  = hadoopFlowConnector.connect("namecp", source, sink, assembly);
+
+    flow.complete();
+
+    assertTrue(fs.exists(new Path(parquetOutputPath)));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    doRead(new ParquetTBaseScheme(Name.class));
+  }
+
+  @Test
+  public void testReadWithoutClass() throws Exception {
+    doRead(new ParquetTBaseScheme());
+  }
+
+  private void doRead(Scheme sourceScheme) throws Exception {
+    createFileForRead();
+
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Tap source = new Hfs(sourceScheme, parquetInputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe( "namecp" );
+    assembly = new Each(assembly, new UnpackThriftFunction());
+    Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
+    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
+  }
+
+
+  private void createFileForRead() throws Exception {
+    final Path fileToCreate = new Path(parquetInputPath+"/names.parquet");
+
+    final Configuration conf = new Configuration();
+    final FileSystem fs = fileToCreate.getFileSystem(conf);
+    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
+
+    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf,
taskId), protocolFactory, Name.class);
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+
+    Name n1 = new Name();
+    n1.setFirst_name("Alice");
+    n1.setLast_name("Practice");
+    Name n2 = new Name();
+    n2.setFirst_name("Bob");
+    n2.setLast_name("Hope");
+    Name n3 = new Name();
+    n3.setFirst_name("Charlie");
+    n3.setLast_name("Horse");
+
+    n1.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n2.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n3.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    w.close();
+  }
+
+  private static class PackThriftFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Name name = new Name();
+      name.setFirst_name(arguments.getString(0));
+      name.setLast_name(arguments.getString(1));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+
+  private static class UnpackThriftFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Name name = (Name) arguments.getObject(0);
+      result.add(name.getFirst_name());
+      result.add(name.getLast_name());
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
index 6f86062..55a6326 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java
@@ -202,14 +202,20 @@ public class TestMergeMetadataFiles {
       ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.metaPath1, info.metaPath2),
mergedOut, info.conf);
       fail("this should throw");
     } catch (RuntimeException e) {
-      assertEquals("could not merge metadata: key schema_num has conflicting values: [2,
1]", e.getMessage());
+      boolean eq1 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting
values: [2, 1]");
+      boolean eq2 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting
values: [1, 2]");
+      
+      assertEquals(eq1 || eq2, true);
     }
 
     try {
       ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.commonMetaPath1, info.commonMetaPath2),
mergedCommonOut, info.conf);
       fail("this should throw");
     } catch (RuntimeException e) {
-      assertEquals("could not merge metadata: key schema_num has conflicting values: [2,
1]", e.getMessage());
+      boolean eq1 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting
values: [2, 1]");
+      boolean eq2 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting
values: [1, 2]");
+
+      assertEquals(eq1 || eq2, true);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet_cascading.md
----------------------------------------------------------------------
diff --git a/parquet_cascading.md b/parquet_cascading.md
index a1b0a68..0eeaceb 100644
--- a/parquet_cascading.md
+++ b/parquet_cascading.md
@@ -147,4 +147,17 @@ scheme builder classes.
 ### 2.2 Projection Pushdown with Tuples
 When using ParquetTupleScheme, specifying projection pushdown is as simple as specifying
fields as the parameter of the constructor of ParquetTupleScheme:
 
+
+3. Cascading 2.0 & Cascading 3.0
+================================
+Cascading 3.0 introduced a breaking interface change in the Scheme abstract class, which
causes a breaking change in all scheme implementations.
+The parquet-cascading3 directory contains a separate library for use with Cascading 3.0
+
+A significant part of the code remains identical; this shared part is in the parquet-cascading-common23
directory, which is not a Maven module.
+
+You cannot use both parquet-cascading and parquet-cascading3 in the same Classloader, which
should be fine as you cannot use both cascading-core 2.x and cascading-core 3.x in the same
Classloader either.
+
+
+
+
 `Scheme sourceScheme = new ParquetTupleScheme(new Fields("age"));`

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 94d7a02..98fd862 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@
     <shade.prefix>shaded.parquet</shade.prefix>
     <hadoop.version>1.1.0</hadoop.version>
     <cascading.version>2.5.3</cascading.version>
+    <cascading3.version>3.0.3</cascading3.version>
     <parquet.format.version>2.3.1</parquet.format.version>
     <previous.version>1.7.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
@@ -98,6 +99,7 @@
     <module>parquet-avro</module>
     <module>parquet-benchmarks</module>
     <module>parquet-cascading</module>
+    <module>parquet-cascading3</module>
     <module>parquet-column</module>
     <module>parquet-common</module>
     <module>parquet-encoding</module>
@@ -197,6 +199,13 @@
             </execution>
           </executions>
         </plugin>
+        
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-resources-plugin</artifactId>
+          <version>2.7</version>
+        </plugin>
+              
         <plugin>
           <artifactId>maven-enforcer-plugin</artifactId>
           <version>1.3.1</version>
@@ -322,6 +331,23 @@
           </execution>
         </executions-->
       </plugin>
+      
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.7</version>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-remote-resources-plugin</artifactId>
+          <version>1.5</version>
+          <configuration>
+            <skip>true</skip>
+          </configuration>
+      </plugin>
+      
+            
       <plugin>
         <!-- Override source and target from the ASF parent -->
         <groupId>org.apache.maven.plugins</groupId>


Mime
View raw message