Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34E5A11795 for ; Sat, 5 Jul 2014 20:23:33 +0000 (UTC) Received: (qmail 92746 invoked by uid 500); 5 Jul 2014 20:23:33 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 92713 invoked by uid 500); 5 Jul 2014 20:23:33 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 92704 invoked by uid 99); 5 Jul 2014 20:23:33 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 05 Jul 2014 20:23:33 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D16AD99EC14; Sat, 5 Jul 2014 20:23:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <311ba1301bcd4e84a4306bd177b924e6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-434: outputConf options ignored on some key Targets Date: Sat, 5 Jul 2014 20:23:32 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 9e18e44bd -> 3dd12779b CRUNCH-434: outputConf options ignored on some key Targets Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3dd12779 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3dd12779 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3dd12779 Branch: refs/heads/apache-crunch-0.8 Commit: 3dd12779bfc74cdad38cdb68dccea5e3b4fca9ed Parents: 9e18e44 Author: Josh Wills Authored: Fri Jul 4 16:20:33 2014 -0700 Committer: Josh Wills Committed: Sat Jul 5 13:09:48 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/io/CompressIT.java | 74 ++++++++++++++++++++ .../java/org/apache/crunch/io/Compress.java | 61 ++++++++++++++++ .../apache/crunch/io/avro/AvroFileTarget.java | 15 ++++ .../io/parquet/AvroParquetFileTarget.java | 26 ++++--- .../apache/crunch/io/text/TextFileTarget.java | 18 ++++- 5 files changed, 185 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java new file mode 100644 index 0000000..bad034b --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import org.apache.crunch.PCollection; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CompressIT { + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testCompressText() throws Exception { + String urlsFile = tmpDir.copyResourceFileName("urls.txt"); + String out = tmpDir.getFileName("out"); + MRPipeline p = new MRPipeline(CompressIT.class, tmpDir.getDefaultConfiguration()); + PCollection in = p.readTextFile(urlsFile); + in.write(Compress.gzip(To.textFile(out))); + p.done(); + assertTrue(checkDirContainsExt(out, ".gz")); + } + + @Test + public void testCompressAvro() throws Exception { + String urlsFile = tmpDir.copyResourceFileName("urls.txt"); + String out = tmpDir.getFileName("out"); + MRPipeline p = new MRPipeline(CompressIT.class, tmpDir.getDefaultConfiguration()); + PCollection in = p.read(From.textFile(urlsFile, Avros.strings())); + in.write(Compress.snappy(To.avroFile(out))); + p.done(); + + FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration()); + FileStatus fstat = fs.getFileStatus(new Path(out, "part-m-00000.avro")); + assertEquals(176, fstat.getLen()); + } + + private boolean checkDirContainsExt(String dir, String ext) throws Exception { + File directory = new File(dir); + for (File f : directory.listFiles()) { + if (f.getName().endsWith(ext)) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/Compress.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/Compress.java b/crunch-core/src/main/java/org/apache/crunch/io/Compress.java new file mode 100644 index 0000000..881b017 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/Compress.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.mapred.AvroJob; +import org.apache.crunch.Target; +import org.apache.crunch.io.parquet.AvroParquetFileSourceTarget; +import org.apache.crunch.io.parquet.AvroParquetFileTarget; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; + +/** + * Helper functions for compressing output data. + */ +public class Compress { + + /** + * Configure the given output target to be compressed using the given codec. + */ + public static T compress(T target, Class codecClass) { + return (T) target.outputConf("mapred.output.compress", "true") + .outputConf("mapred.output.compression.codec", codecClass.getCanonicalName()); + } + + /** + * Configure the given output target to be compressed using Gzip. + */ + public static T gzip(T target) { + return (T) compress(target, GzipCodec.class) + .outputConf(AvroJob.OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC); + } + + /** + * Configure the given output target to be compressed using Snappy. If the Target is one of the AvroParquet targets + * contained in Crunch, the Parquet-specific SnappyCodec will be used instead of the default Hadoop one. + */ + public static T snappy(T target) { + Class snappyCodec = org.apache.hadoop.io.compress.SnappyCodec.class; + if (target instanceof AvroParquetFileTarget || target instanceof AvroParquetFileSourceTarget) { + snappyCodec = parquet.hadoop.codec.SnappyCodec.class; + } + return (T) compress(target, snappyCodec) + .outputConf(AvroJob.OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java index 68b3b01..ed8d7b7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java @@ -17,8 +17,10 @@ */ package org.apache.crunch.io.avro; +import com.google.common.collect.Maps; import org.apache.avro.mapred.AvroWrapper; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.OutputHandler; @@ -32,8 +34,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; +import java.util.Map; + public class AvroFileTarget extends FileTargetImpl { + private Map extraConf = Maps.newHashMap(); + public AvroFileTarget(String path) { this(new Path(path)); } @@ -61,6 +67,12 @@ public class AvroFileTarget extends FileTargetImpl { } @Override + public Target outputConf(String key, String value) { + extraConf.put(key, value); + return this; + } + + @Override public void configureForMapReduce(Job job, PType ptype, Path outputPath, String name) { AvroType atype = (AvroType) ptype; FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class); @@ -70,6 +82,9 @@ public class AvroFileTarget extends FileTargetImpl { } else { schemaParam = "avro.output.schema." + name; } + for (Map.Entry e : extraConf.entrySet()) { + bundle.set(e.getKey(), e.getValue()); + } bundle.set(schemaParam, atype.getSchema().toString()); AvroMode.fromType(atype).configure(bundle); configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java index a6a34cd..3c2847d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java @@ -17,10 +17,13 @@ */ package org.apache.crunch.io.parquet; +import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.OutputHandler; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; @@ -33,10 +36,14 @@ import org.apache.hadoop.mapreduce.Job; import parquet.avro.AvroWriteSupport; import parquet.hadoop.ParquetOutputFormat; +import java.util.Map; + public class AvroParquetFileTarget extends FileTargetImpl { private static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema"; + private Map extraConf = Maps.newHashMap(); + public AvroParquetFileTarget(String path) { this(new Path(path)); } @@ -50,6 +57,12 @@ public class AvroParquetFileTarget extends FileTargetImpl { } @Override + public Target outputConf(String key, String value) { + extraConf.put(key, value); + return this; + } + + @Override public String toString() { return "Parquet(" + path.toString() + ")"; } @@ -72,21 +85,18 @@ public class AvroParquetFileTarget extends FileTargetImpl { @Override public void configureForMapReduce(Job job, PType ptype, Path outputPath, String name) { AvroType atype = (AvroType) ptype; - Configuration conf = job.getConfiguration(); String schemaParam; if (name == null) { schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER; } else { schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER + "." + name; } - String outputSchema = conf.get(schemaParam); - if (outputSchema == null) { - conf.set(schemaParam, atype.getSchema().toString()); - } else if (!outputSchema.equals(atype.getSchema().toString())) { - throw new IllegalStateException("Avro targets must use the same output schema"); + FormatBundle fb = FormatBundle.forOutput(CrunchAvroParquetOutputFormat.class); + for (Map.Entry e : extraConf.entrySet()) { + fb.set(e.getKey(), e.getValue()); } - configureForMapReduce(job, Void.class, atype.getTypeClass(), - CrunchAvroParquetOutputFormat.class, outputPath, name); + fb.set(schemaParam, atype.getSchema().toString()); + configureForMapReduce(job, Void.class, atype.getTypeClass(), fb, outputPath, name); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index 4b9197b..258936e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -17,8 +17,10 @@ */ package org.apache.crunch.io.text; +import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.SequentialFileNamingScheme; @@ -37,6 +39,8 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import java.util.Map; + public class TextFileTarget extends FileTargetImpl { private static Class getOutputFormat(PType ptype) { if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) { @@ -46,6 +50,8 @@ public class TextFileTarget extends FileTargetImpl { } } + private final Map extraConf = Maps.newHashMap(); + public TextFileTarget(String path) { this(new Path(path)); } @@ -69,11 +75,21 @@ public class TextFileTarget extends FileTargetImpl { } @Override + public Target outputConf(String key, String value) { + extraConf.put(key, value); + return this; + } + + @Override public void configureForMapReduce(Job job, PType ptype, Path outputPath, String name) { Converter converter = ptype.getConverter(); Class keyClass = converter.getKeyClass(); Class valueClass = converter.getValueClass(); - configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(getOutputFormat(ptype)), outputPath, name); + FormatBundle fb = FormatBundle.forOutput(getOutputFormat(ptype)); + for (Map.Entry e : extraConf.entrySet()) { + fb.set(e.getKey(), e.getValue()); + } + configureForMapReduce(job, keyClass, valueClass, fb, outputPath, name); } @Override