crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/2] git commit: CRUNCH-433 Add support for AvroKeyValue file
Date Mon, 07 Jul 2014 02:25:28 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 67d48ee9b -> ea18b48e0


CRUNCH-433 Add support for AvroKeyValue file

Add support for reading AvroKeyValue files created using
org.apache.avro.mapreduce.AvroJob. Also add explicit methods in
the From class for reading an Avro key/value file as a PTable.

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/00cd84a2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/00cd84a2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/00cd84a2

Branch: refs/heads/apache-crunch-0.8
Commit: 00cd84a2afea237d5751fcc50003776590b3d428
Parents: 67d48ee
Author: Gabriel Reid <greid@apache.org>
Authored: Sun Jul 6 21:07:37 2014 +0200
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Jul 6 19:09:57 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroKeyValueIT.java   | 230 +++++++++++++++++++
 .../main/java/org/apache/crunch/io/From.java    |  25 ++
 .../crunch/io/avro/AvroTableFileSource.java     |  49 ++++
 .../crunch/types/avro/AvroGroupedTableType.java |   3 +-
 .../types/avro/AvroKeyValueTableType.java       | 157 +++++++++++++
 .../apache/crunch/types/avro/AvroTableType.java |   2 +-
 .../org/apache/crunch/types/avro/Avros.java     |  30 +++
 .../crunch/types/avro/BaseAvroTableType.java    |  37 +++
 .../crunch/types/avro/AvroDeepCopierTest.java   |  22 +-
 .../apache/crunch/types/avro/AvroTypeTest.java  |  21 +-
 .../crunch/types/avro/ReflectedPerson.java      |  87 +++++++
 11 files changed, 624 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java
new file mode 100644
index 0000000..d78841c
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroKeyValueIT.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.avro.ReflectedPerson;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+/**
+ * Tests for verifying behavior with Avro produced using the org.apache.avro.mapred.*
+ * and org.apache.avro.mapreduce.* APIs.
+ */
+public class AvroKeyValueIT extends CrunchTestSupport implements Serializable {
+
+  @Test
+  public void testInputFromMapReduceKeyValueFile_Generic() throws InterruptedException, IOException,
ClassNotFoundException {
+
+    Path keyValuePath = produceMapReduceOutputFile();
+
+    Pipeline pipeline = new MRPipeline(AvroKeyValueIT.class, tempDir.getDefaultConfiguration());
+    PTable<Person, Integer> personTable = pipeline.read(
+        From.avroTableFile(keyValuePath, Avros.tableOf(Avros.specifics(Person.class), Avros.ints())));
+
+    org.apache.crunch.Pair<Person, Integer> firstEntry = Iterables.getFirst(personTable.materialize(),
null);
+
+    assertEquals("a", firstEntry.first().getName().toString());
+    assertEquals(Integer.valueOf(1), firstEntry.second());
+
+    pipeline.done();
+
+  }
+
+  @Test
+  public void testInputFromMapRedKeyValueFile_Specific() throws IOException {
+    Path keyValuePath = produceMapRedOutputFile();
+
+    Pipeline pipeline = new MRPipeline(AvroKeyValueIT.class, tempDir.getDefaultConfiguration());
+    PTable<Person, Integer> personTable = pipeline.read(
+        From.avroTableFile(keyValuePath, Avros.keyValueTableOf(Avros.specifics(Person.class),
Avros.ints())));
+
+    org.apache.crunch.Pair<Person, Integer> firstEntry = Iterables.getFirst(personTable.materialize(),
null);
+
+    assertEquals("a", firstEntry.first().getName().toString());
+    assertEquals(Integer.valueOf(1), firstEntry.second());
+
+    // Verify that deep copying on this PType works as well
+    PTableType<Person, Integer> tableType = Avros.keyValueTableOf(Avros.specifics(Person.class),
Avros.ints());
+    tableType.initialize(tempDir.getDefaultConfiguration());
+    org.apache.crunch.Pair<Person, Integer> detachedPair = tableType.getDetachedValue(firstEntry);
+    assertEquals(firstEntry, detachedPair);
+
+    pipeline.done();
+  }
+
+  @Test
+  public void testInputFromMapRedKeyValueFile_Reflect() throws IOException {
+    Path keyValuePath = produceMapRedOutputFile();
+
+    Pipeline pipeline = new MRPipeline(AvroKeyValueIT.class, tempDir.getDefaultConfiguration());
+    PTable<ReflectedPerson, Integer> personTable = pipeline.read(
+        From.avroTableFile(keyValuePath, Avros.keyValueTableOf(Avros.reflects(ReflectedPerson.class),
Avros.ints())));
+
+    org.apache.crunch.Pair<ReflectedPerson, Integer> firstEntry = Iterables.getFirst(personTable.materialize(),
null);
+
+    assertEquals("a", firstEntry.first().getName().toString());
+    assertEquals(Integer.valueOf(1), firstEntry.second());
+
+    // Verify that deep copying on this PType works as well
+    PTableType<ReflectedPerson, Integer> tableType =
+        Avros.keyValueTableOf(Avros.reflects(ReflectedPerson.class), Avros.ints());
+    tableType.initialize(tempDir.getDefaultConfiguration());
+    org.apache.crunch.Pair<ReflectedPerson, Integer> detachedPair = tableType.getDetachedValue(firstEntry);
+    assertEquals(firstEntry, detachedPair);
+
+    pipeline.done();
+  }
+
+  /**
+   * Produces an Avro file using the org.apache.avro.mapred.* API.
+   */
+  private Path produceMapRedOutputFile() throws IOException {
+
+    JobConf conf = new JobConf(tempDir.getDefaultConfiguration(), AvroKeyValueIT.class);
+
+    org.apache.avro.mapred.AvroJob.setOutputSchema(
+        conf,
+        Pair.getPairSchema(Person.SCHEMA$, Schema.create(Schema.Type.INT)));
+
+
+    conf.setMapperClass(MapRedPersonMapper.class);
+    conf.setNumReduceTasks(0);
+
+    conf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+
+
+
+    Path outputPath = new Path(tempDir.getFileName("mapreduce_output"));
+    org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, tempDir.copyResourcePath("letters.txt"));
+    org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(conf, outputPath);
+
+    RunningJob runningJob = JobClient.runJob(conf);
+    runningJob.waitForCompletion();
+
+    return outputPath;
+
+  }
+
+  /**
+   * Produces an Avro file using the org.apache.avro.mapreduce.* API.
+   */
+  private Path produceMapReduceOutputFile() throws IOException, ClassNotFoundException, InterruptedException
{
+
+
+    Job job = new Job(tempDir.getDefaultConfiguration());
+    job.setJarByClass(AvroKeyValueIT.class);
+    job.setJobName("Color Count");
+
+    Path outputPath = new Path(tempDir.getFileName("mapreduce_output"));
+
+    FileInputFormat.setInputPaths(job, tempDir.copyResourcePath("letters.txt"));
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(MapReducePersonMapper.class);
+    job.setNumReduceTasks(0);
+    AvroJob.setOutputKeySchema(job, Person.SCHEMA$);
+    AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
+
+    job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
+
+    boolean success = job.waitForCompletion(true);
+
+    if (!success) {
+      throw new RuntimeException("Job failed");
+    }
+
+    return outputPath;
+  }
+
+  public static class MapReducePersonMapper extends
+      Mapper<LongWritable, Text, AvroKey<Person>, AvroValue<Integer>> {
+
+    @Override
+    protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
+      Person person = Person.newBuilder()
+          .setName(value.toString())
+          .setAge(value.toString().length())
+          .setSiblingnames(ImmutableList.<CharSequence>of())
+          .build();
+      context.write(
+          new AvroKey<Person>(person),
+          new AvroValue<Integer>(1));
+
+    }
+  }
+
+  public static class MapRedPersonMapper implements org.apache.hadoop.mapred.Mapper<LongWritable,
Text, AvroWrapper<Pair<Person,Integer>>, NullWritable> {
+    @Override
+    public void map(LongWritable key, Text value, OutputCollector<AvroWrapper<Pair<Person,Integer>>,
NullWritable> outputCollector, Reporter reporter) throws IOException {
+      Person person = Person.newBuilder()
+          .setName(value.toString())
+          .setAge(value.toString().length())
+          .setSiblingnames(ImmutableList.<CharSequence>of())
+          .build();
+      outputCollector.collect(
+          new AvroWrapper<Pair<Person, Integer>>(new Pair<Person, Integer>(person,
1)),
+          NullWritable.get());
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(JobConf entries) {
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
index 6e43321..4d2f050 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
@@ -28,9 +29,11 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.Pair;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.io.avro.AvroFileSource;
+import org.apache.crunch.io.avro.AvroTableFileSource;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
 import org.apache.crunch.io.seq.SeqFileSource;
 import org.apache.crunch.io.seq.SeqFileTableSource;
@@ -349,6 +352,28 @@ public class From {
     return avroFile(paths, Avros.generics(getSchemaFromPath(paths.get(0), conf)));
   }
 
+  /**
+   * Creates a {@code TableSource<K,V>} for reading an Avro key/value file at the given
path.
+   *
+   * @param path The path to the data on the filesystem
+   * @param tableType Avro table type for deserializing the table data
+   * @return a new {@code TableSource<K,V>} instance for reading Avro key/value data
+   */
+  public static <K, V> TableSource<K, V> avroTableFile(Path path, PTableType<K,
V> tableType) {
+    return avroTableFile(ImmutableList.of(path), tableType);
+  }
+
+  /**
+   * Creates a {@code TableSource<K,V>} for reading an Avro key/value file at the given
paths.
+   *
+   * @param paths list of paths to be read by the returned source
+   * @param tableType Avro table type for deserializing the table data
+   * @return a new {@code TableSource<K,V>} instance for reading Avro key/value data
+   */
+  public static <K, V> TableSource<K, V> avroTableFile(List<Path> paths,
PTableType<K, V> tableType) {
+    return new AvroTableFileSource<K, V>(paths, (AvroType<Pair<K,V>>)tableType);
+  }
+
   static Schema getSchemaFromPath(Path path, Configuration conf) {
     DataFileReader reader = null;
     try {

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java
new file mode 100644
index 0000000..beee79c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroTableFileSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.crunch.io.avro;
+
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A file source for reading a table of Avro keys and values.
+ *
+ * This file source can be used for reading and writing tables compatible with
+ * the {@code org.apache.avro.mapred.AvroJob} and {@code org.apache.avro.mapreduce.AvroJob}
classes (in addition to
+ * tables created by Crunch).
+ *
+ * @see org.apache.crunch.types.avro.Avros#tableOf(org.apache.crunch.types.PType, org.apache.crunch.types.PType)
+ * @see org.apache.crunch.types.avro.Avros#keyValueTableOf(org.apache.crunch.types.PType,
org.apache.crunch.types.PType)
+ */
+public class AvroTableFileSource<K, V> extends AvroFileSource<Pair<K, V>>
implements TableSource<K,V> {
+
+  public AvroTableFileSource(List<Path> paths, AvroType<Pair<K, V>> tableType)
{
+    super(paths, tableType);
+  }
+
+  @Override
+  public PTableType<K, V> getTableType() {
+    return (PTableType<K,V>)super.getType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 7178274..3df313f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -30,6 +30,7 @@ import org.apache.crunch.fn.PairMapFn;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -43,7 +44,7 @@ class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V>
{
   private final MapFn inputFn;
   private final MapFn outputFn;
 
-  public AvroGroupedTableType(AvroTableType<K, V> tableType) {
+  public AvroGroupedTableType(BaseAvroTableType<K, V> tableType) {
     super(tableType);
     AvroType keyType = (AvroType) tableType.getKeyType();
     AvroType valueType = (AvroType) tableType.getValueType();

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java
b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java
new file mode 100644
index 0000000..5891322
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyValueTableType.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * A {@code PTableType} that is compatible with Avro key/value files that are created or
read using the
+ * {@code org.apache.avro.mapreduce.AvroJob} class.
+ */
+class AvroKeyValueTableType<K, V> extends BaseAvroTableType<K, V> implements
PTableType<K, V> {
+
+  private static class PairToAvroKeyValueRecord extends MapFn<Pair, GenericRecord>
{
+    private final MapFn keyMapFn;
+    private final MapFn valueMapFn;
+    private final String keySchemaJson;
+    private final String valueSchemaJson;
+
+    private String keyValueSchemaJson;
+    private transient Schema keyValueSchema;
+
+    public PairToAvroKeyValueRecord(AvroType keyType, AvroType valueType) {
+      this.keyMapFn = keyType.getOutputMapFn();
+      this.keySchemaJson = keyType.getSchema().toString();
+      this.valueMapFn = valueType.getOutputMapFn();
+      this.valueSchemaJson = valueType.getSchema().toString();
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      keyMapFn.configure(conf);
+      valueMapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      keyMapFn.setContext(context);
+      valueMapFn.setContext(context);
+    }
+
+    @Override
+    public void initialize() {
+      keyMapFn.initialize();
+      valueMapFn.initialize();
+      Schema.Parser parser = new Schema.Parser();
+      keyValueSchemaJson = AvroKeyValue.getSchema(parser.parse(keySchemaJson), parser.parse(valueSchemaJson)).toString();
+    }
+
+    @Override
+    public GenericRecord map(Pair input) {
+      if (keyValueSchema == null) {
+        keyValueSchema = new Schema.Parser().parse(keyValueSchemaJson);
+      }
+      GenericRecord keyValueRecord = new GenericData.Record(keyValueSchema);
+      keyValueRecord.put(AvroKeyValue.KEY_FIELD, keyMapFn.map(input.first()));
+      keyValueRecord.put(AvroKeyValue.VALUE_FIELD, valueMapFn.map(input.second()));
+      return keyValueRecord;
+    }
+  }
+
+  private static class AvroKeyValueRecordToPair extends MapFn<GenericRecord, Pair>
{
+
+    private final MapFn firstMapFn;
+    private final MapFn secondMapFn;
+
+    public AvroKeyValueRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
+      this.firstMapFn = firstMapFn;
+      this.secondMapFn = secondMapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      firstMapFn.configure(conf);
+      secondMapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      firstMapFn.setContext(context);
+      secondMapFn.setContext(context);
+    }
+
+    @Override
+    public void initialize() {
+      firstMapFn.initialize();
+      secondMapFn.initialize();
+    }
+
+    @Override
+    public Pair map(GenericRecord input) {
+      return Pair.of(
+          firstMapFn.map(input.get(AvroKeyValue.KEY_FIELD)),
+          secondMapFn.map(input.get(AvroKeyValue.VALUE_FIELD)));
+    }
+  }
+
+  private final AvroType<K> keyType;
+  private final AvroType<V> valueType;
+
+  public AvroKeyValueTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K,
V>> pairClass) {
+    super(pairClass, AvroKeyValue.getSchema(keyType.getSchema(), valueType.getSchema()),
+        new AvroKeyValueRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()),
+        new PairToAvroKeyValueRecord(keyType, valueType),
+        new TupleDeepCopier(Pair.class, keyType, valueType),
+        null, keyType, valueType);
+    this.keyType = keyType;
+    this.valueType = valueType;
+  }
+
+  @Override
+  public PType<K> getKeyType() {
+    return keyType;
+  }
+
+  @Override
+  public PType<V> getValueType() {
+    return valueType;
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return new AvroGroupedTableType<K, V>(this);
+  }
+
+  @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 8e9e069..00047cc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  * The implementation of the PTableType interface for Avro-based serialization.
  * 
  */
-class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K,
V> {
+class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType<K,
V> {
 
   private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair>
{
     private final MapFn keyMapFn;

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 4b2c67b..6e11f1b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -838,6 +838,16 @@ public class Avros {
     return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
   }
 
+  /**
+   * A table type with an Avro type as key and as value.
+   * <p/>
+   * The {code PTableType} returned by this method is also compatible with files containing
Avro {@code Pair}s that
+   * are created using the {@code org.apache.avro.mapred.AvroJob} class.
+   *
+   * @param key the PType of the key in the table
+   * @param value the PType of the value in the table
+   * @return PTableType for reading and writing avro tables
+   */
   public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key,
PType<V> value) {
     if (key instanceof PTableType) {
       PTableType ptt = (PTableType) key;
@@ -852,6 +862,26 @@ public class Avros {
     return new AvroTableType(avroKey, avroValue, Pair.class);
   }
 
+  /**
+   * A table type with an Avro type as key and value. The {@code PTableType} returned by
this method is specifically
+   * for reading and writing files that are compatible with those created via the
+   * {@code org.apache.avro.mapreduce.AvroJob} class. For all other Avro table purposes,
the
+   * {@link #tableOf(org.apache.crunch.types.PType, org.apache.crunch.types.PType)} method
should be used.
+   *
+   * @param key the PType of the key in the table
+   * @param value the PType of the value in the table
+   * @return PTableType for reading and writing files compatible with those created via
+   * the {@code org.apache.avro.mapreduce.AvroJob} class
+   */
+  public static final <K, V> AvroKeyValueTableType<K, V> keyValueTableOf(PType<K>
key, PType<V> value) {
+    AvroType<K> avroKey = (AvroType<K>) key;
+    AvroType<V> avroValue = (AvroType<V>) value;
+
+    return new AvroKeyValueTableType<K, V>(avroKey, avroValue,
+        // Casting this to class is an unfortunately little way to get the generics out of
the way here
+        (Class)Pair.class);
+  }
+
   private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
 
   private static Schema allowNulls(Schema base) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java
b/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java
new file mode 100644
index 0000000..848b33a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/BaseAvroTableType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+/**
+ * Base type for dealing with PTables with Avro keys and values.
+ */
+abstract class BaseAvroTableType<K, V> extends AvroType<Pair<K, V>> implements
PTableType<K, V> {
+
+  protected BaseAvroTableType(Class<Pair<K, V>> typeClass, Schema schema, MapFn
inputMapFn, MapFn outputMapFn,
+      DeepCopier<Pair<K, V>> deepCopier, AvroRecordType recordType, PType...
ptypes) {
+    super(typeClass, schema, inputMapFn, outputMapFn, deepCopier, recordType, ptypes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index 795e2b4..da8dd28 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.apache.avro.generic.GenericData.Record;
@@ -70,27 +69,12 @@ public class AvroDeepCopierTest {
     assertNull(new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(null));
   }
 
-  static class ReflectedPerson {
-    String name;
-    int age;
-    List<String> siblingnames;
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof ReflectedPerson)) {
-        return false;
-      }
-      ReflectedPerson that = (ReflectedPerson) other;
-      return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames);
-    }
-  }
-
   @Test
   public void testDeepCopyReflect() {
     ReflectedPerson person = new ReflectedPerson();
-    person.name = "John Doe";
-    person.age = 42;
-    person.siblingnames = Lists.newArrayList();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<String>newArrayList());
 
     AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>(
         ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema());

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 481444f..8764275 100644
--- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -200,29 +200,14 @@ public class AvroTypeTest {
     specificType.getDetachedValue(person);
   }
 
-  static class ReflectedPerson {
-    String name;
-    int age;
-    List<String> siblingnames;
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof ReflectedPerson)) {
-        return false;
-      }
-      ReflectedPerson that = (ReflectedPerson) other;
-      return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames);
-    }
-  }
-
   @Test
   public void testGetDetachedValue_ReflectAvroType() {
     AvroType<ReflectedPerson> reflectType = Avros.reflects(ReflectedPerson.class);
     reflectType.initialize(new Configuration());
     ReflectedPerson rp = new ReflectedPerson();
-    rp.name = "josh";
-    rp.age = 32;
-    rp.siblingnames = Lists.newArrayList();
+    rp.setName("josh");
+    rp.setAge(32);
+    rp.setSiblingnames(Lists.<String>newArrayList());
     ReflectedPerson detached = reflectType.getDetachedValue(rp);
     assertEquals(rp, detached);
     assertNotSame(rp, detached);

http://git-wip-us.apache.org/repos/asf/crunch/blob/00cd84a2/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java
new file mode 100644
index 0000000..c7c5f88
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/ReflectedPerson.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.crunch.types.avro;
+
+import java.util.List;
+
+/**
+ * A test helper class that conforms to the Person Avro specific data class, to use the Person
schema for testing
+ * with reflection-based reading and writing.
+ */
+public class ReflectedPerson {
+
+  private String name;
+  private int age;
+  private List<String> siblingnames;
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public int getAge() {
+    return age;
+  }
+
+  public void setAge(int age) {
+    this.age = age;
+  }
+
+  public List<String> getSiblingnames() {
+    return siblingnames;
+  }
+
+  public void setSiblingnames(List<String> siblingnames) {
+    this.siblingnames = siblingnames;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ReflectedPerson that = (ReflectedPerson) o;
+
+    if (age != that.age) return false;
+    if (name != null ? !name.equals(that.name) : that.name != null) return false;
+    if (siblingnames != null ? !siblingnames.equals(that.siblingnames) : that.siblingnames
!= null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name != null ? name.hashCode() : 0;
+    result = 31 * result + age;
+    result = 31 * result + (siblingnames != null ? siblingnames.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ReflectedPerson{" +
+        "name='" + name + '\'' +
+        ", age=" + age +
+        ", siblingnames=" + siblingnames +
+        '}';
+  }
+}


Mime
View raw message