parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alexleven...@apache.org
Subject incubator-parquet-mr git commit: PARQUET-197 : fix parquet-cascading not writing parquet metadata file
Date Thu, 05 Mar 2015 23:57:09 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 258349426 -> 5851e6da7


PARQUET-197 : fix parquet-cascading not writing parquet metadata file

Repro: run a scalding job that writes parquet files to a folder. no _metadata and _common_metadata
file is created
Impact: potential performance problem if parquet metadata is read from client side, which
is the case for sparkSQL
casue: the metatdata writing logic is in the mapreduce API but not the mapred API of parquet.

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #131 from tsdeng/fix_mapred_output_committer and squashes the following commits:

6e8d8eb [Tianshuo Deng] rename to MapredParquetOutputCommiter, add setAsOutputFormat method
to set the outputCommiter
ec758db [Tianshuo Deng] license
448b649 [Tianshuo Deng] fix parquet-cascading not writing parquet metadata file


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/5851e6da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/5851e6da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/5851e6da

Branch: refs/heads/master
Commit: 5851e6da7a05b5d53a01803ccabc0f685fc36d52
Parents: 2583494
Author: Tianshuo Deng <tdeng@twitter.com>
Authored: Thu Mar 5 15:56:58 2015 -0800
Committer: Alex Levenson <alexlevenson@twitter.com>
Committed: Thu Mar 5 15:56:58 2015 -0800

----------------------------------------------------------------------
 .../parquet/cascading/ParquetTBaseScheme.java   |  2 +-
 .../parquet/cascading/ParquetTupleScheme.java   |  2 +-
 .../cascading/TestParquetTBaseScheme.java       |  3 ++
 .../parquet/hadoop/ParquetOutputCommitter.java  |  4 ++
 .../mapred/DeprecatedParquetOutputFormat.java   |  5 +++
 .../mapred/MapredParquetOutputCommitter.java    | 42 ++++++++++++++++++++
 .../parquet/scrooge/ParquetScroogeScheme.java   |  5 +--
 7 files changed, 57 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
index 41f8c4f..ab84749 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
@@ -73,7 +73,7 @@ public class ParquetTBaseScheme<T extends TBase<?,?>> extends
ParquetValueScheme
       throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify
a thrift class in the constructor");
     }
 
-    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
     TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
index ea0a953..7f6ac3a 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
@@ -171,7 +171,7 @@ public class ParquetTupleScheme extends Scheme<JobConf, RecordReader,
OutputColl
   @Override
   public void sinkConfInit(FlowProcess<JobConf> fp,
           Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
     ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
index 11a5b5e..7d1454c 100644
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
+++ b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
@@ -78,6 +78,9 @@ public class TestParquetTBaseScheme {
     Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
 
     flow.complete();
+    assertTrue(fs.exists(new Path(parquetOutputPath)));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
index 841c211..0e0ce42 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
@@ -45,6 +45,10 @@ public class ParquetOutputCommitter extends FileOutputCommitter {
   public void commitJob(JobContext jobContext) throws IOException {
     super.commitJob(jobContext);
     Configuration configuration = ContextUtil.getConfiguration(jobContext);
+    writeMetaDataFile(configuration,outputPath);
+  }
+
+  public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
     if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
       try {
         final FileSystem fileSystem = outputPath.getFileSystem(configuration);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
index 5b84e54..c0defb1 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -55,6 +55,11 @@ public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.F
     configuration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary);
   }
 
+  public static void setAsOutputFormat(JobConf jobConf) {
+    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    jobConf.setOutputCommitter(MapredParquetOutputCommitter.class);
+  }
+
   private CompressionCodecName getCodec(final JobConf conf) {
     return CodecConfig.from(conf).getCodec();
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
new file mode 100644
index 0000000..eb97c09
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package parquet.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import parquet.hadoop.ParquetOutputCommitter;
+
+import java.io.IOException;
+
+/**
+ *
+ * Adapter for supporting ParquetOutputCommitter in mapred API
+ *
+ * @author Tianshuo Deng
+ */
+public class MapredParquetOutputCommitter extends FileOutputCommitter {
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf jobConf = jobContext.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(jobConf);
+    ParquetOutputCommitter.writeMetaDataFile(jobConf, outputPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5851e6da/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
index 3abe957..2745307 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
@@ -18,8 +18,6 @@
  */
 package parquet.scrooge;
 
-import java.io.IOException;
-
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
@@ -27,7 +25,6 @@ import org.apache.hadoop.mapred.RecordReader;
 import com.twitter.scrooge.ThriftStruct;
 
 import cascading.flow.FlowProcess;
-import cascading.scheme.SinkCall;
 import cascading.tap.Tap;
 import parquet.cascading.ParquetValueScheme;
 import parquet.filter2.predicate.FilterPredicate;
@@ -56,7 +53,7 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends
ParquetValueSc
   @Override
   public void sinkConfInit(FlowProcess<JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     ParquetOutputFormat.setWriteSupportClass(jobConf, ScroogeWriteSupport.class);
     ScroogeWriteSupport.setScroogeClass(jobConf, this.config.getKlass());
   }


Mime
View raw message