crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-434: outputConf options ignored on some key Targets
Date Sat, 05 Jul 2014 20:23:32 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 9e18e44bd -> 3dd12779b


CRUNCH-434: outputConf options ignored on some key Targets


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3dd12779
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3dd12779
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3dd12779

Branch: refs/heads/apache-crunch-0.8
Commit: 3dd12779bfc74cdad38cdb68dccea5e3b4fca9ed
Parents: 9e18e44
Author: Josh Wills <jwills@apache.org>
Authored: Fri Jul 4 16:20:33 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sat Jul 5 13:09:48 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/io/CompressIT.java   | 74 ++++++++++++++++++++
 .../java/org/apache/crunch/io/Compress.java     | 61 ++++++++++++++++
 .../apache/crunch/io/avro/AvroFileTarget.java   | 15 ++++
 .../io/parquet/AvroParquetFileTarget.java       | 26 ++++---
 .../apache/crunch/io/text/TextFileTarget.java   | 18 ++++-
 5 files changed, 185 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java
new file mode 100644
index 0000000..bad034b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/CompressIT.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CompressIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testCompressText() throws Exception {
+    String urlsFile = tmpDir.copyResourceFileName("urls.txt");
+    String out = tmpDir.getFileName("out");
+    MRPipeline p = new MRPipeline(CompressIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> in = p.readTextFile(urlsFile);
+    in.write(Compress.gzip(To.textFile(out)));
+    p.done();
+    assertTrue(checkDirContainsExt(out, ".gz"));
+  }
+
+  @Test
+  public void testCompressAvro() throws Exception {
+    String urlsFile = tmpDir.copyResourceFileName("urls.txt");
+    String out = tmpDir.getFileName("out");
+    MRPipeline p = new MRPipeline(CompressIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> in = p.read(From.textFile(urlsFile, Avros.strings()));
+    in.write(Compress.snappy(To.avroFile(out)));
+    p.done();
+
+    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+    FileStatus fstat = fs.getFileStatus(new Path(out, "part-m-00000.avro"));
+    assertEquals(176, fstat.getLen());
+  }
+
+  private boolean checkDirContainsExt(String dir, String ext) throws Exception {
+    File directory = new File(dir);
+    for (File f : directory.listFiles()) {
+      if (f.getName().endsWith(ext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/Compress.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/Compress.java b/crunch-core/src/main/java/org/apache/crunch/io/Compress.java
new file mode 100644
index 0000000..881b017
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/Compress.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.Target;
+import org.apache.crunch.io.parquet.AvroParquetFileSourceTarget;
+import org.apache.crunch.io.parquet.AvroParquetFileTarget;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+
+/**
+ * Helper functions for compressing output data.
+ */
+public class Compress {
+
+  /**
+   * Configure the given output target to be compressed using the given codec.
+   */
+  public static <T extends Target> T compress(T target, Class<? extends CompressionCodec>
codecClass) {
+    return (T) target.outputConf("mapred.output.compress", "true")
+        .outputConf("mapred.output.compression.codec", codecClass.getCanonicalName());
+  }
+
+  /**
+   * Configure the given output target to be compressed using Gzip.
+   */
+  public static <T extends Target> T gzip(T target) {
+    return (T) compress(target, GzipCodec.class)
+        .outputConf(AvroJob.OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC);
+  }
+
+  /**
+   * Configure the given output target to be compressed using Snappy. If the Target is one
of the AvroParquet targets
+   * contained in Crunch, the Parquet-specific SnappyCodec will be used instead of the default
Hadoop one.
+   */
+  public static <T extends Target> T snappy(T target) {
+    Class<? extends CompressionCodec> snappyCodec = org.apache.hadoop.io.compress.SnappyCodec.class;
+    if (target instanceof AvroParquetFileTarget || target instanceof AvroParquetFileSourceTarget)
{
+      snappyCodec = parquet.hadoop.codec.SnappyCodec.class;
+    }
+    return (T) compress(target, snappyCodec)
+        .outputConf(AvroJob.OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index 68b3b01..ed8d7b7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -17,8 +17,10 @@
  */
 package org.apache.crunch.io.avro;
 
+import com.google.common.collect.Maps;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.OutputHandler;
@@ -32,8 +34,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 
+import java.util.Map;
+
 public class AvroFileTarget extends FileTargetImpl {
 
+  private Map<String, String> extraConf = Maps.newHashMap();
+
   public AvroFileTarget(String path) {
     this(new Path(path));
   }
@@ -61,6 +67,12 @@ public class AvroFileTarget extends FileTargetImpl {
   }
 
   @Override
+  public Target outputConf(String key, String value) {
+    extraConf.put(key, value);
+    return this;
+  }
+
+  @Override
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
     AvroType<?> atype = (AvroType<?>) ptype;
     FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class);
@@ -70,6 +82,9 @@ public class AvroFileTarget extends FileTargetImpl {
     } else {
       schemaParam = "avro.output.schema." + name;
     }
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      bundle.set(e.getKey(), e.getValue());
+    }
     bundle.set(schemaParam, atype.getSchema().toString());
     AvroMode.fromType(atype).configure(bundle);
     configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle,

http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index a6a34cd..3c2847d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -17,10 +17,13 @@
  */
 package org.apache.crunch.io.parquet;
 
+import com.google.common.collect.Maps;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
@@ -33,10 +36,14 @@ import org.apache.hadoop.mapreduce.Job;
 import parquet.avro.AvroWriteSupport;
 import parquet.hadoop.ParquetOutputFormat;
 
+import java.util.Map;
+
 public class AvroParquetFileTarget extends FileTargetImpl {
 
   private static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema";
 
+  private Map<String, String> extraConf = Maps.newHashMap();
+
   public AvroParquetFileTarget(String path) {
     this(new Path(path));
   }
@@ -50,6 +57,12 @@ public class AvroParquetFileTarget extends FileTargetImpl {
   }
 
   @Override
+  public Target outputConf(String key, String value) {
+    extraConf.put(key, value);
+    return this;
+  }
+
+  @Override
   public String toString() {
     return "Parquet(" + path.toString() + ")";
   }
@@ -72,21 +85,18 @@ public class AvroParquetFileTarget extends FileTargetImpl {
   @Override
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
     AvroType<?> atype = (AvroType<?>) ptype;
-    Configuration conf = job.getConfiguration();
     String schemaParam;
     if (name == null) {
       schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER;
     } else {
       schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER + "." + name;
     }
-    String outputSchema = conf.get(schemaParam);
-    if (outputSchema == null) {
-      conf.set(schemaParam, atype.getSchema().toString());
-    } else if (!outputSchema.equals(atype.getSchema().toString())) {
-      throw new IllegalStateException("Avro targets must use the same output schema");
+    FormatBundle fb = FormatBundle.forOutput(CrunchAvroParquetOutputFormat.class);
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      fb.set(e.getKey(), e.getValue());
     }
-    configureForMapReduce(job, Void.class, atype.getTypeClass(),
-        CrunchAvroParquetOutputFormat.class, outputPath, name);
+    fb.set(schemaParam, atype.getSchema().toString());
+    configureForMapReduce(job, Void.class, atype.getTypeClass(), fb, outputPath, name);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/3dd12779/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index 4b9197b..258936e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -17,8 +17,10 @@
  */
 package org.apache.crunch.io.text;
 
+import com.google.common.collect.Maps;
 import org.apache.avro.Schema;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SequentialFileNamingScheme;
@@ -37,6 +39,8 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
+import java.util.Map;
+
 public class TextFileTarget extends FileTargetImpl {
   private static Class<? extends FileOutputFormat> getOutputFormat(PType<?> ptype)
{
     if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
@@ -46,6 +50,8 @@ public class TextFileTarget extends FileTargetImpl {
     }
   }
 
+  private final Map<String, String> extraConf = Maps.newHashMap();
+
   public <T> TextFileTarget(String path) {
     this(new Path(path));
   }
@@ -69,11 +75,21 @@ public class TextFileTarget extends FileTargetImpl {
   }
 
   @Override
+  public Target outputConf(String key, String value) {
+    extraConf.put(key, value);
+    return this;
+  }
+
+  @Override
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
     Converter converter = ptype.getConverter();
     Class keyClass = converter.getKeyClass();
     Class valueClass = converter.getValueClass();
-    configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(getOutputFormat(ptype)),
outputPath, name);
+    FormatBundle fb = FormatBundle.forOutput(getOutputFormat(ptype));
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      fb.set(e.getKey(), e.getValue());
+    }
+    configureForMapReduce(job, keyClass, valueClass, fb, outputPath, name);
   }
 
   @Override


Mime
View raw message