crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject crunch git commit: CRUNCH-562: Support one output file per key for Parquet.
Date Mon, 19 Oct 2015 09:20:15 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 57235348d -> b4da23b26


CRUNCH-562: Support one output file per key for Parquet.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b4da23b2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b4da23b2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b4da23b2

Branch: refs/heads/master
Commit: b4da23b26539aeea4ca9cd88e8799739ef7b6da4
Parents: 5723534
Author: Tom White <tomwhite@apache.org>
Authored: Mon Oct 19 10:05:27 2015 +0100
Committer: Tom White <tomwhite@apache.org>
Committed: Mon Oct 19 10:05:27 2015 +0100

----------------------------------------------------------------------
 .../io/parquet/AvroParquetPathPerKeyIT.java     | 148 +++++++++++++++++++
 .../crunch/io/avro/AvroPathPerKeyTarget.java    |  13 +-
 .../io/parquet/AvroParquetFileTarget.java       |   2 +-
 .../AvroParquetPathPerKeyOutputFormat.java      |  95 ++++++++++++
 .../io/parquet/AvroParquetPathPerKeyTarget.java |  69 +++++++++
 5 files changed, 322 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b4da23b2/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyIT.java
new file mode 100644
index 0000000..c903d7a
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyIT.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class AvroParquetPathPerKeyIT extends CrunchTestSupport implements Serializable {
+  @Test
+  public void testOutputFilePerKey() throws Exception {
+    Pipeline p = new MRPipeline(AvroParquetPathPerKeyIT.class, tempDir.getDefaultConfiguration());
+    Path outDir = tempDir.getPath("out");
+    p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+        .parallelDo(new MapFn<String, Pair<String, Person>>() {
+          @Override
+          public Pair<String, Person> map(String input) {
+            String[] p = input.split("\t");
+            return Pair.of(p[0], newPerson());
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
+        .groupByKey()
+        .write(new AvroParquetPathPerKeyTarget(outDir));
+    p.done();
+
+    Set<String> names = Sets.newHashSet();
+    FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+    for (FileStatus fstat : fs.listStatus(outDir)) {
+      names.add(fstat.getPath().getName());
+    }
+    assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+    FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+    assertEquals(1, aStat.length);
+    assertEquals("part-r-00000.parquet", aStat[0].getPath().getName());
+
+    FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+    assertEquals(1, bStat.length);
+    assertEquals("part-r-00000.parquet", bStat[0].getPath().getName());
+  }
+
+  @Test
+  public void testOutputFilePerKey_NothingToOutput() throws Exception {
+    Pipeline p = new MRPipeline(AvroParquetPathPerKeyIT.class, tempDir.getDefaultConfiguration());
+    Path outDir = tempDir.getPath("out");
+
+    p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+        .parallelDo(new MapFn<String, Pair<String, Person>>() {
+          @Override
+          public Pair<String, Person> map(String input) {
+            String[] p = input.split("\t");
+            return Pair.of(p[0], newPerson());
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
+        .filter(FilterFns.<Pair<String, Person>>REJECT_ALL())
+        .groupByKey()
+        .write(new AvroParquetPathPerKeyTarget(outDir));
+    p.done();
+
+    FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+    assertFalse(fs.exists(outDir));
+  }
+
+  @Test
+  public void testOutputFilePerKey_Directories() throws Exception {
+    Pipeline p = new MRPipeline(AvroParquetPathPerKeyIT.class, tempDir.getDefaultConfiguration());
+    Path outDir = tempDir.getPath("out");
+    p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+            .parallelDo(new MapFn<String, Pair<String, Person>>() {
+              @Override
+              public Pair<String, Person> map(String input) {
+                String[] p = input.split("\t");
+                return Pair.of(p[0] + "/child", newPerson());
+              }
+            }, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
+            .groupByKey()
+            .write(new AvroParquetPathPerKeyTarget(outDir));
+    p.done();
+
+    Set<String> names = Sets.newHashSet();
+    FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+    for (FileStatus fstat : fs.listStatus(outDir)) {
+      names.add(fstat.getPath().getName());
+    }
+    assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+    Path aParent = new Path(outDir, "A");
+    FileStatus[] aParentStat = fs.listStatus(aParent);
+    assertEquals(1, aParentStat.length);
+    assertEquals("child", aParentStat[0].getPath().getName());
+    FileStatus[] aChildStat = fs.listStatus(new Path(aParent, "child"));
+    assertEquals(1, aChildStat.length);
+    assertEquals("part-r-00000.parquet", aChildStat[0].getPath().getName());
+
+    Path bParent = new Path(outDir, "B");
+    FileStatus[] bParentStat = fs.listStatus(bParent);
+    assertEquals(1, bParentStat.length);
+    assertEquals("child", bParentStat[0].getPath().getName());
+    FileStatus[] bChildStat = fs.listStatus(new Path(bParent, "child"));
+    assertEquals(1, bChildStat.length);
+    assertEquals("part-r-00000.parquet", bChildStat[0].getPath().getName());
+  }
+
+  private Person newPerson() {
+    Person person = new Person();
+    person.name = "John Doe";
+    person.age = 42;
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    person.siblingnames = siblingNames;
+    return person;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b4da23b2/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index d17e5d7..a3ecbb8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io.avro;
 
+import java.io.IOException;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.io.FileNamingScheme;
@@ -35,11 +36,10 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * A {@link org.apache.crunch.Target} that wraps {@link org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat}
to allow one file
  * per key to be written as the output of a {@code PTable<String, T>}.
@@ -60,7 +60,13 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
   }
 
   public AvroPathPerKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
-    super(path, AvroPathPerKeyOutputFormat.class, fileNamingScheme);
+    this(path, AvroPathPerKeyOutputFormat.class, fileNamingScheme);
+  }
+
+  protected AvroPathPerKeyTarget(Path path, Class<? extends FileOutputFormat>
+      outputFormatClass,
+      FileNamingScheme fileNamingScheme) {
+    super(path, outputFormatClass, fileNamingScheme);
   }
 
   @Override
@@ -92,7 +98,6 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
       return;
     }
     FileSystem dstFs = path.getFileSystem(conf);
-    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base));
     if (!dstFs.exists(path)) {
       dstFs.mkdirs(path);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b4da23b2/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index 5fb4c53..34ee14b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -40,7 +40,7 @@ import java.util.Map;
 
 public class AvroParquetFileTarget extends FileTargetImpl {
 
-  private static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema";
+  static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema";
 
   private Map<String, String> extraConf = Maps.newHashMap();
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/b4da23b2/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
new file mode 100644
index 0000000..852483d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import java.io.IOException;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * A {@link FileOutputFormat} that takes in a {@link Utf8} and an Avro record and writes
the Avro records to
+ * a sub-directory of the output path whose name is equal to the string-form of the {@code
Utf8}.
+ *
+ * This {@code OutputFormat} only keeps one {@code RecordWriter} open at a time, so it's
a very good idea to write
+ * out all of the records for the same key at the same time within each partition so as not
to be frequently opening
+ * and closing files.
+ */
+public class AvroParquetPathPerKeyOutputFormat<T> extends FileOutputFormat<AvroWrapper<Pair<Utf8,
T>>, NullWritable> {
+  @Override
+  public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext
taskAttemptContext)
+      throws IOException, InterruptedException {
+    Configuration conf = taskAttemptContext.getConfiguration();
+    Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename",
"part"));
+    return new AvroParquetFilePerKeyRecordWriter<T>(basePath,
+        getUniqueFile(taskAttemptContext, "part", ".parquet"), conf);
+  }
+
+  private class AvroParquetFilePerKeyRecordWriter<T> extends RecordWriter<AvroWrapper<Pair<Utf8,
T>>, NullWritable> {
+
+    private final Path basePath;
+    private final String uniqueFileName;
+    private final Configuration conf;
+    private String currentKey;
+    private RecordWriter<Void, T> currentWriter;
+
+    public AvroParquetFilePerKeyRecordWriter(Path basePath, String uniqueFileName, Configuration
conf) {
+      this.basePath = basePath;
+      this.uniqueFileName = uniqueFileName;
+      this.conf = conf;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(AvroWrapper<Pair<Utf8, T>> record, NullWritable n) throws
IOException, InterruptedException {
+      String key = record.datum().key().toString();
+      if (!key.equals(currentKey)) {
+        if (currentWriter != null) {
+          currentWriter.close(null); // TaskAttemptContext not used for close
+        }
+        currentKey = key;
+        Path dir = new Path(basePath, key);
+        FileSystem fs = dir.getFileSystem(conf);
+        if (!fs.exists(dir)) {
+          fs.mkdirs(dir);
+        }
+        currentWriter = (RecordWriter<Void, T>)
+            new AvroParquetFileTarget.CrunchAvroParquetOutputFormat().getRecordWriter(conf,
+                new Path(dir, uniqueFileName),
+                AvroParquetFileTarget.CrunchAvroParquetOutputFormat.getCompression(conf));
+      }
+      currentWriter.write(null, record.datum().value());
+    }
+
+    @Override
+    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
+      if (currentWriter != null) {
+        currentWriter.close(taskAttemptContext);
+        currentKey = null;
+        currentWriter = null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b4da23b2/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyTarget.java
new file mode 100644
index 0000000..707bd9e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyTarget.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.avro.AvroPathPerKeyTarget;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * A {@link org.apache.crunch.Target} that wraps {@link AvroParquetPathPerKeyOutputFormat}
to allow one file
+ * per key to be written as the output of a {@code PTable<String, T>}.
+ *
+ * <p>Note the restrictions that apply to the {@code AvroParquetPathPerKeyOutputFormat};
in particular, it's a good
+ * idea to write out all of the records for the same key together within each partition of
the data.
+ */
+public class AvroParquetPathPerKeyTarget extends AvroPathPerKeyTarget {
+
+  public AvroParquetPathPerKeyTarget(String path) {
+    this(new Path(path));
+  }
+
+  public AvroParquetPathPerKeyTarget(Path path) {
+    this(path, SequentialFileNamingScheme.getInstance());
+  }
+
+  public AvroParquetPathPerKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, AvroParquetPathPerKeyOutputFormat.class, fileNamingScheme);
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
+    AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
+    String schemaParam;
+    if (name == null) {
+      schemaParam = AvroParquetFileTarget.PARQUET_AVRO_SCHEMA_PARAMETER;
+    } else {
+      schemaParam = AvroParquetFileTarget.PARQUET_AVRO_SCHEMA_PARAMETER + "." + name;
+    }
+    FormatBundle fb = FormatBundle.forOutput(AvroParquetPathPerKeyOutputFormat.class);
+    fb.set(schemaParam, atype.getSchema().toString());
+    configureForMapReduce(job, Void.class, atype.getTypeClass(), fb, outputPath, name);
+  }
+
+  @Override
+  public String toString() {
+    return "AvroParquetPathPerKey(" + path + ")";
+  }
+}


Mime
View raw message