crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-199: Add support for Trevni sources and targets. Contributed by Micah Whitacre.
Date Wed, 01 May 2013 22:21:06 GMT
Updated Branches:
  refs/heads/master 48cf308c8 -> 5578d5e8b


CRUNCH-199: Add support for Trevni sources and targets. Contributed by Micah Whitacre.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5578d5e8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5578d5e8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5578d5e8

Branch: refs/heads/master
Commit: 5578d5e8be97ff5d5b86815e005d9df387965f60
Parents: 48cf308
Author: Josh Wills <jwills@apache.org>
Authored: Wed May 1 15:20:20 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Wed May 1 15:20:41 2013 -0700

----------------------------------------------------------------------
 crunch-core/pom.xml                                |    9 +
 .../io/avro/trevni/TrevniFileSourceTargetIT.java   |  133 ++++++++++
 .../crunch/io/avro/trevni/TrevniKeyPipelineIT.java |  195 +++++++++++++++
 .../io/avro/trevni/TrevniFileReaderFactory.java    |  106 ++++++++
 .../crunch/io/avro/trevni/TrevniKeySource.java     |   58 +++++
 .../io/avro/trevni/TrevniKeySourceTarget.java      |   40 +++
 .../crunch/io/avro/trevni/TrevniKeyTarget.java     |  146 +++++++++++
 pom.xml                                            |   18 ++-
 8 files changed, 704 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index d365c3d..e4b6796 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -45,6 +45,15 @@ under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>trevni-avro</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>trevni-core</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.javassist</groupId>
       <artifactId>javassist</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
new file mode 100644
index 0000000..d591c65
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.avro.AvroColumnWriter;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class TrevniFileSourceTargetIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro.trevni");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
+    ColumnFileMetaData cfmd = new ColumnFileMetaData();
+    AvroColumnWriter writer = new AvroColumnWriter(schema, cfmd);
+
+    for (GenericRecord record : genericRecords) {
+      writer.write(record);
+    }
+
+    writer.writeTo(avroFile);
+  }
+
+  @Test
+  public void testSpecific() throws IOException {
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()),
+        Avros.records(Person.class)));
+
+    List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+
+    Person expectedPerson = new Person();
+    expectedPerson.name = "John Doe";
+    expectedPerson.age = 42;
+
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    expectedPerson.siblingnames = siblingNames;
+
+    assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+  }
+
+  @Test
+  public void testGeneric() throws IOException {
+    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+    GenericRecord savedRecord = new Record(genericPersonSchema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+    Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Record> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()),
+        Avros.generics(genericPersonSchema)));
+
+    List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+
+    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+  }
+
+  @Test
+  public void testReflect() throws IOException {
+    AvroType<StringWrapper> strType = Avros.reflects (StringWrapper.class);
+    Schema schema = strType.getSchema();
+    GenericRecord savedRecord = new Record(schema);
+    savedRecord.put("value", "stringvalue");
+    populateGenericFile(Lists.newArrayList(savedRecord), schema);
+
+    Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<StringWrapper> stringValueCollection = pipeline.read(new TrevniKeySource(new
Path(avroFile.getAbsolutePath()),
+        strType));
+
+    List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize());
+
+    assertEquals(1, recordList.size());
+    StringWrapper stringWrapper = recordList.get(0);
+    assertEquals("stringvalue", stringWrapper.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
new file mode 100644
index 0000000..cd7fe0b
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.apache.trevni.avro.AvroColumnReader;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class TrevniKeyPipelineIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro.trevni");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+  }
+
+  @Test
+  public void toAvroTrevniKeyTarget() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, trevniFile);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    File trvFile = new File(outputFile, "part-m-00000-part-0.trv");
+
+    AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile);
+    params.setSchema(Person.SCHEMA$);
+    params.setModel(SpecificData.get());
+    AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params);
+
+    try{
+      Person readPerson = reader.next();
+      assertThat(readPerson, is(person));
+    }finally{
+      reader.close();
+    }
+  }
+
+  @Test
+  public void toAvroTrevniKeyMultipleTarget() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File output1File = tmpDir.getFile("output1");
+    File output2File = tmpDir.getFile("output2");
+    pipeline.write(genericCollection, new TrevniKeyTarget(output1File.getAbsolutePath()));
+    pipeline.write(genericCollection, new TrevniKeyTarget(output2File.getAbsolutePath()));
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    File trv1File = new File(output1File, "part-m-00000-part-0.trv");
+    File trv2File = new File(output2File, "part-m-00000-part-0.trv");
+
+    AvroColumnReader.Params params = new AvroColumnReader.Params(trv1File);
+    params.setSchema(Person.SCHEMA$);
+    params.setModel(SpecificData.get());
+    AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params);
+
+    try{
+      Person readPerson = reader.next();
+      assertThat(readPerson, is(person));
+    }finally{
+      reader.close();
+    }
+
+    params = new AvroColumnReader.Params(trv2File);
+    params.setSchema(Person.SCHEMA$);
+    params.setModel(SpecificData.get());
+    reader = new AvroColumnReader<Person>(params);
+
+    try{
+      Person readPerson = reader.next();
+      assertThat(readPerson, is(person));
+    }finally{
+      reader.close();
+    }
+  }
+
+  @Test
+  public void toAvroTrevniKeyTargetReadSource() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, trevniFile);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    PCollection<Person> retrievedPeople = pipeline.read(new TrevniKeySource<Person>(
+        new Path(outputFile.toURI()), Avros.records(Person.class)));
+
+    Person retrievedPerson = retrievedPeople.materialize().iterator().next();
+
+    assertThat(retrievedPerson, is(person));
+
+    File trvFile = new File(outputFile, "part-m-00000-part-0.trv");
+
+    AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile);
+    params.setSchema(Person.SCHEMA$);
+    params.setModel(SpecificData.get());
+    AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params);
+
+    try{
+      Person readPerson = reader.next();
+      assertThat(readPerson, is(person));
+    }finally{
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
new file mode 100644
index 0000000..15bf7c1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.trevni.Input;
+import org.apache.trevni.avro.AvroColumnReader;
+import org.apache.trevni.avro.HadoopInput;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class TrevniFileReaderFactory<T> implements FileReaderFactory<T> {
+
+  private static final Log LOG = LogFactory.getLog(TrevniFileReaderFactory.class);
+  private final AvroType<T> aType;
+  private final MapFn<T, T> mapFn;
+  private final Schema schema;
+
+  public TrevniFileReaderFactory(AvroType<T> atype) {
+    this.aType = atype;
+    schema = atype.getSchema();
+    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+  }
+
+  public TrevniFileReaderFactory(Schema schema) {
+    this.aType = null;
+    this.schema = schema;
+    this.mapFn = IdentityFn.<T>getInstance();
+  }
+
+  static <T> AvroColumnReader<T> getReader(Input input, AvroType<T> avroType,
Schema schema) {
+    AvroColumnReader.Params params = new AvroColumnReader.Params(input);
+    params.setSchema(schema);
+    if (avroType.hasReflect()) {
+      if (avroType.hasSpecific()) {
+        Avros.checkCombiningSpecificAndReflectionSchemas();
+      }
+      params.setModel(ReflectData.get());
+    } else if (avroType.hasSpecific()) {
+      params.setModel(SpecificData.get());
+    } else {
+      params.setModel(GenericData.get());
+    }
+
+    try {
+      return new AvroColumnReader<T>(params);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public Iterator<T> read(FileSystem fs, final Path path) {
+    this.mapFn.initialize();
+    try {
+      HadoopInput input = new HadoopInput(path, fs.getConf());
+      final AvroColumnReader<T> reader = getReader(input, aType, schema);
+      return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>()
{
+        @Override
+        public boolean hasNext() {
+          return reader.hasNext();
+        }
+
+        @Override
+        public T next() {
+          return mapFn.map(reader.next());
+        }
+      });
+    } catch (IOException e) {
+      LOG.info("Could not read avro file at path: " + path, e);
+      return Iterators.emptyIterator();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
new file mode 100644
index 0000000..193ac1b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.trevni.avro.mapreduce.AvroTrevniKeyInputFormat;
+
+import java.io.IOException;
+
+public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSource<T>
{
+
+  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
+    FormatBundle bundle = FormatBundle.forInput(AvroTrevniKeyInputFormat.class)
+        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
+        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
+        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
+    return bundle;
+  }
+
+  public TrevniKeySource(Path path, AvroType<T> ptype) {
+    super(path, ptype, getBundle(ptype));
+  }
+
+  @Override
+  public String toString() {
+    return "TrevniKey(" + path.toString() + ")";
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    return CompositePathIterable.create(fs, path, new TrevniFileReaderFactory<T>((AvroType<T>)
ptype));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
new file mode 100644
index 0000000..72a0fd3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+public class TrevniKeySourceTarget<T> extends ReadableSourcePathTargetImpl<T>
{
+  public TrevniKeySourceTarget(Path path, AvroType<T> atype) {
+    this(path, atype, new SequentialFileNamingScheme());
+  }
+
+  public TrevniKeySourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme)
{
+    super(new TrevniKeySource(path, atype), new AvroFileTarget(path), fileNamingScheme);
+  }
+
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
new file mode 100644
index 0000000..555aaf4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro.trevni;
+
+import org.apache.avro.hadoop.io.AvroKeyComparator;
+import org.apache.avro.hadoop.io.AvroSerialization;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.trevni.avro.mapreduce.AvroTrevniKeyOutputFormat;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY;
+import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY_CLASS;
+
+public class TrevniKeyTarget extends FileTargetImpl {
+
+  public TrevniKeyTarget(String path) {
+    this(new Path(path));
+  }
+
+  public TrevniKeyTarget(Path path) {
+    this(path, new SequentialFileNamingScheme());
+  }
+
+  public TrevniKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, AvroTrevniKeyOutputFormat.class, fileNamingScheme);
+  }
+
+  @Override
+  public String toString() {
+    return "TrevniKey(" + path.toString() + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (!(ptype instanceof AvroType)) {
+      return false;
+    }
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
+    AvroType<?> atype = (AvroType<?>) ptype;
+    Configuration conf = job.getConfiguration();
+
+    if (null == name) {
+      AvroJob.setOutputKeySchema(job, atype.getSchema());
+      AvroJob.setMapOutputKeySchema(job, atype.getSchema());
+
+      Avros.configureReflectDataFactory(conf);
+      configureForMapReduce(job, AvroKey.class, NullWritable.class, AvroTrevniKeyOutputFormat.class,
+          outputPath, name);
+    } else {
+      FormatBundle<AvroTrevniKeyOutputFormat> bundle = FormatBundle.forOutput(
+          AvroTrevniKeyOutputFormat.class);
+
+      bundle.set("avro.schema.output.key", atype.getSchema().toString());
+      bundle.set("mapred.output.value.groupfn.class", AvroKeyComparator.class.getName());
+      bundle.set("mapred.output.key.comparator.class", AvroKeyComparator.class.getName());
+      bundle.set("avro.serialization.key.writer.schema", atype.getSchema().toString());
+      bundle.set("avro.serialization.key.reader.schema", atype.getSchema().toString());
+
+      //Equivalent to...
+      // AvroSerialization.addToConfiguration(job.getConfiguration());
+      Collection<String> serializations = conf.getStringCollection("io.serializations");
+      if (!serializations.contains(AvroSerialization.class.getName())) {
+        serializations.add(AvroSerialization.class.getName());
+        bundle.set(name, StringUtils.arrayToString(serializations.toArray(new String[serializations.size()])));
+      }
+
+      //The following is equivalent to Avros.configureReflectDataFactory(conf);
+      bundle.set(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass().getName());
+
+      //Set output which honors the name.
+      bundle.set("mapred.output.dir", new Path(outputPath, name).toString());
+
+      //Set value which will be ignored but should get past the FileOutputFormat.checkOutputSpecs(..)
+      //which requires the "mapred.output.dir" value to be set.
+      FileOutputFormat.setOutputPath(job, outputPath);
+
+      CrunchOutputs.addNamedOutput(job, name,
+          bundle,
+          AvroKey.class,
+          NullWritable.class);
+    }
+  }
+
+  @Override
+  protected Path getSourcePattern(final Path workingPath, final int index) {
+    //output directories are typically of the form
+    //out#/part-m-#####/part-m-#####/part-#.trv but we don't want both of those folders because
it isn't
+    //readable by the TrevniKeySource.
+    return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "*/part-*/part-*");
+  }
+
+  @Override
+  protected Path getDestFile(final Configuration conf, final Path src, final Path dir, final
boolean mapOnlyJob) throws IOException {
+    Path outputFilename = super.getDestFile(conf, src, dir, mapOnlyJob);
+    //make sure the dst file is unique in the case there are multiple part-#.trv files per
partition.
+    return new Path(outputFilename.toString()+"-"+src.getName());
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof AvroType) {
+      return new TrevniKeySourceTarget(path, (AvroType<T>) ptype);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 71f5e0f..acbf66f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,7 @@ under the License.
     <commons-httpclient.version>3.0.1</commons-httpclient.version>
     <commons-logging.version>1.1.1</commons-logging.version>
     <commons-cli.version>1.2</commons-cli.version>
-    <avro.version>1.7.0</avro.version>
+    <avro.version>1.7.4</avro.version>
     <javassist.version>3.16.1-GA</javassist.version>
     <jackson.version>1.8.8</jackson.version>
     <protobuf-java.version>2.3.0</protobuf-java.version>
@@ -191,6 +191,22 @@ under the License.
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>trevni-core</artifactId>
+        <version>${avro.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>trevni-avro</artifactId>
+        <version>${avro.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
 
       <dependency>
         <groupId>org.javassist</groupId>


Mime
View raw message