crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-521: Remove hadoop1-related code/dependencies
Date Thu, 21 May 2015 23:20:49 GMT
Repository: crunch
Updated Branches:
  refs/heads/master e90cf2782 -> 85b985a72


CRUNCH-521: Remove hadoop1-related code/dependencies


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/85b985a7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/85b985a7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/85b985a7

Branch: refs/heads/master
Commit: 85b985a725e181444867efb3b9d5cf7ac8f0ef52
Parents: e90cf27
Author: Josh Wills <jwills@apache.org>
Authored: Wed May 20 10:55:55 2015 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu May 21 14:18:03 2015 -0700

----------------------------------------------------------------------
 crunch-core/pom.xml                             |   9 -
 .../avro/trevni/TrevniFileSourceTargetIT.java   | 133 -------------
 .../io/avro/trevni/TrevniKeyPipelineIT.java     | 195 -------------------
 .../mapreduce/TaskAttemptContextFactory.java    |  70 -------
 .../crunch/impl/mr/run/CrunchRecordReader.java  |  15 +-
 .../org/apache/crunch/io/CrunchOutputs.java     |  33 +---
 .../io/avro/trevni/TrevniFileReaderFactory.java | 106 ----------
 .../crunch/io/avro/trevni/TrevniKeySource.java  |  68 -------
 .../io/avro/trevni/TrevniKeySourceTarget.java   |  40 ----
 .../crunch/io/avro/trevni/TrevniKeyTarget.java  | 149 --------------
 .../io/avro/trevni/TrevniOutputFormat.java      |  35 ----
 .../io/avro/trevni/TrevniReadableData.java      |  39 ----
 .../io/avro/trevni/TrevniRecordWriter.java      | 140 -------------
 .../io/impl/DefaultFileReaderFactory.java       |   5 +-
 .../crunch/impl/spark/SparkRuntimeContext.java  |   4 +-
 pom.xml                                         | 118 ++++-------
 16 files changed, 52 insertions(+), 1107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index 3d671a7..59794f0 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -46,15 +46,6 @@ under the License.
     </dependency>
 
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>trevni-avro</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>trevni-core</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-avro</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
deleted file mode 100644
index d591c65..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.fs.Path;
-import org.apache.trevni.ColumnFileMetaData;
-import org.apache.trevni.avro.AvroColumnWriter;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-@SuppressWarnings("serial")
-public class TrevniFileSourceTargetIT implements Serializable {
-
-  private transient File avroFile;
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Before
-  public void setUp() throws IOException {
-    avroFile = tmpDir.getFile("test.avro.trevni");
-  }
-
-  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
-    ColumnFileMetaData cfmd = new ColumnFileMetaData();
-    AvroColumnWriter writer = new AvroColumnWriter(schema, cfmd);
-
-    for (GenericRecord record : genericRecords) {
-      writer.write(record);
-    }
-
-    writer.writeTo(avroFile);
-  }
-
-  @Test
-  public void testSpecific() throws IOException {
-    GenericRecord savedRecord = new Record(Person.SCHEMA$);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-    Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<Person> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()),
-        Avros.records(Person.class)));
-
-    List<Person> personList = Lists.newArrayList(genericCollection.materialize());
-
-    Person expectedPerson = new Person();
-    expectedPerson.name = "John Doe";
-    expectedPerson.age = 42;
-
-    List<CharSequence> siblingNames = Lists.newArrayList();
-    siblingNames.add("Jimmy");
-    siblingNames.add("Jane");
-    expectedPerson.siblingnames = siblingNames;
-
-    assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
-  }
-
-  @Test
-  public void testGeneric() throws IOException {
-    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
-    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
-    GenericRecord savedRecord = new Record(genericPersonSchema);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
-
-    Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<Record> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()),
-        Avros.generics(genericPersonSchema)));
-
-    List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
-
-    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
-  }
-
-  @Test
-  public void testReflect() throws IOException {
-    AvroType<StringWrapper> strType = Avros.reflects (StringWrapper.class);
-    Schema schema = strType.getSchema();
-    GenericRecord savedRecord = new Record(schema);
-    savedRecord.put("value", "stringvalue");
-    populateGenericFile(Lists.newArrayList(savedRecord), schema);
-
-    Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<StringWrapper> stringValueCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()),
-        strType));
-
-    List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize());
-
-    assertEquals(1, recordList.size());
-    StringWrapper stringWrapper = recordList.get(0);
-    assertEquals("stringvalue", stringWrapper.getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
deleted file mode 100644
index c6ea4fa..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificData;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.Person;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.fs.Path;
-import org.apache.trevni.avro.AvroColumnReader;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-public class TrevniKeyPipelineIT implements Serializable {
-
-  private transient File avroFile;
-  @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
-
-  @Before
-  public void setUp() throws IOException {
-    avroFile = tmpDir.getFile("test.avro.trevni");
-  }
-
-  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
-    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
-    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
-
-    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
-    dataFileWriter.create(schema, outputStream);
-
-    for (GenericRecord record : genericRecords) {
-      dataFileWriter.append(record);
-    }
-
-    dataFileWriter.close();
-    outputStream.close();
-  }
-
-  @Test
-  public void toAvroTrevniKeyTarget() throws Exception {
-    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-    Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
-        Avros.records(Person.class)));
-    File outputFile = tmpDir.getFile("output");
-    Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath());
-    pipeline.write(genericCollection, trevniFile);
-    pipeline.run();
-
-    Person person = genericCollection.materialize().iterator().next();
-
-    File trvFile = new File(outputFile, "part-m-00000.trv-part-0.trv");
-
-    AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile);
-    params.setSchema(Person.SCHEMA$);
-    params.setModel(SpecificData.get());
-    AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params);
-
-    try{
-      Person readPerson = reader.next();
-      assertThat(readPerson, is(person));
-    }finally{
-      reader.close();
-    }
-  }
-
-  @Test
-  public void toAvroTrevniKeyMultipleTarget() throws Exception {
-    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-    Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
-        Avros.records(Person.class)));
-    File output1File = tmpDir.getFile("output1");
-    File output2File = tmpDir.getFile("output2");
-    pipeline.write(genericCollection, new TrevniKeyTarget(output1File.getAbsolutePath()));
-    pipeline.write(genericCollection, new TrevniKeyTarget(output2File.getAbsolutePath()));
-    pipeline.run();
-
-    Person person = genericCollection.materialize().iterator().next();
-
-    File trv1File = new File(output1File, "part-m-00000.trv-part-0.trv");
-    File trv2File = new File(output2File, "part-m-00000.trv-part-0.trv");
-
-    AvroColumnReader.Params params = new AvroColumnReader.Params(trv1File);
-    params.setSchema(Person.SCHEMA$);
-    params.setModel(SpecificData.get());
-    AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params);
-
-    try{
-      Person readPerson = reader.next();
-      assertThat(readPerson, is(person));
-    }finally{
-      reader.close();
-    }
-
-    params = new AvroColumnReader.Params(trv2File);
-    params.setSchema(Person.SCHEMA$);
-    params.setModel(SpecificData.get());
-    reader = new AvroColumnReader<Person>(params);
-
-    try{
-      Person readPerson = reader.next();
-      assertThat(readPerson, is(person));
-    }finally{
-      reader.close();
-    }
-  }
-
-  @Test
-  public void toAvroTrevniKeyTargetReadSource() throws Exception {
-    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-    savedRecord.put("name", "John Doe");
-    savedRecord.put("age", 42);
-    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-    Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
-        Avros.records(Person.class)));
-    File outputFile = tmpDir.getFile("output");
-    Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath());
-    pipeline.write(genericCollection, trevniFile);
-    pipeline.run();
-
-    Person person = genericCollection.materialize().iterator().next();
-
-    PCollection<Person> retrievedPeople = pipeline.read(new TrevniKeySource<Person>(
-        new Path(outputFile.toURI()), Avros.records(Person.class)));
-
-    Person retrievedPerson = retrievedPeople.materialize().iterator().next();
-
-    assertThat(retrievedPerson, is(person));
-
-    File trvFile = new File(outputFile, "part-m-00000.trv-part-0.trv");
-
-    AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile);
-    params.setSchema(Person.SCHEMA$);
-    params.setModel(SpecificData.get());
-    AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params);
-
-    try{
-      Person readPerson = reader.next();
-      assertThat(readPerson, is(person));
-    }finally{
-      reader.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
deleted file mode 100644
index 256fa42..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.hadoop.mapreduce;
-
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in
- * Hadoop 1.x.x and an interface in Hadoop 2.x.x.
- */
-@SuppressWarnings("unchecked")
-public class TaskAttemptContextFactory {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptContextFactory.class);
-
-  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
-
-  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
-    return INSTANCE.createInternal(conf, taskAttemptId);
-  }
-
-  private Constructor<TaskAttemptContext> taskAttemptConstructor;
-
-  private TaskAttemptContextFactory() {
-    Class<TaskAttemptContext> implClass = TaskAttemptContext.class;
-    if (implClass.isInterface()) {
-      try {
-        implClass = (Class<TaskAttemptContext>) Class.forName(
-            "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-      } catch (ClassNotFoundException e) {
-        LOG.error("Could not find TaskAttemptContextImpl class, exiting", e);
-      }
-    }
-    try {
-      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
-    } catch (Exception e) {
-      LOG.error("Could not access TaskAttemptContext constructor, exiting", e);
-    }
-  }
-
-  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
-    try {
-      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
-    } catch (Exception e) {
-      LOG.error("Could not construct a TaskAttemptContext instance", e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index e475f10..2842658 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -19,7 +19,6 @@ package org.apache.crunch.impl.mr.run;
 
 import java.io.IOException;
 
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -27,6 +26,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 class CrunchRecordReader<K, V> extends RecordReader<K, V> {
@@ -44,12 +44,12 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
     if (crunchSplit.get() instanceof CombineFileSplit) {
       combineFileSplit = (CombineFileSplit) crunchSplit.get();
     }
-    this.context = context;
     Configuration conf = crunchSplit.getConf();
     if (conf == null) {
       conf = context.getConfiguration();
       crunchSplit.setConf(conf);
     }
+    this.context = new TaskAttemptContextImpl(conf, context.getTaskAttemptID());
     initNextRecordReader();
   }
 
@@ -75,8 +75,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
     InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(
         crunchSplit.getInputFormatClass(),
         conf);
-    this.curReader = inputFormat.createRecordReader(getDelegateSplit(),
-        TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
+    this.curReader = inputFormat.createRecordReader(getDelegateSplit(), context);
     return true;
   }
 
@@ -137,18 +136,17 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
     this.crunchSplit = (CrunchInputSplit) inputSplit;
-    this.context = context;
     Configuration conf = crunchSplit.getConf();
     if (conf == null) {
       conf = context.getConfiguration();
       crunchSplit.setConf(conf);
     }
+    this.context = new TaskAttemptContextImpl(conf, context.getTaskAttemptID());
     if (crunchSplit.get() instanceof CombineFileSplit) {
       combineFileSplit = (CombineFileSplit) crunchSplit.get();
     }
     if (curReader != null) {
-      curReader.initialize(getDelegateSplit(),
-          TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
+      curReader.initialize(getDelegateSplit(), this.context);
     }
   }
 
@@ -159,8 +157,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
         return false;
       }
       if (curReader != null) {
-        curReader.initialize(getDelegateSplit(),
-            TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+        curReader.initialize(getDelegateSplit(), context);
       }
     }
     return true;

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 247ac08..653a401 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -18,9 +18,7 @@
 package org.apache.crunch.io;
 
 import com.google.common.collect.Sets;
-import java.lang.reflect.Method;
 import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -33,6 +31,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Joiner;
@@ -211,9 +210,7 @@ public class CrunchOutputs<K, V> {
   }
 
   private static TaskAttemptContext getTaskContext(TaskAttemptContext baseContext, Job job) {
-
     org.apache.hadoop.mapreduce.TaskAttemptID baseTaskId = baseContext.getTaskAttemptID();
-
     // Create a task ID context with our specialized job ID.
     org.apache.hadoop.mapreduce.TaskAttemptID  taskId;
     taskId = new org.apache.hadoop.mapreduce.TaskAttemptID(job.getJobID().getJtIdentifier(),
@@ -221,36 +218,14 @@ public class CrunchOutputs<K, V> {
             baseTaskId.isMap(),
             baseTaskId.getTaskID().getId(),
             baseTaskId.getId());
-
-    return TaskAttemptContextFactory.create(
-            job.getConfiguration(), taskId);
+    return new TaskAttemptContextImpl(job.getConfiguration(), taskId);
   }
 
   private static void setJobID(Job job, JobID jobID, String namedOutput) {
-    Method setJobIDMethod;
-    JobID newJobID = jobID;
-    try {
-      // Hadoop 2
-      setJobIDMethod = Job.class.getMethod("setJobID", JobID.class);
-      // Add the named output to the job ID, since that is used by some output formats
-      // to create temporary outputs.
-      newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ?
+    JobID newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ?
           jobID :
           new JobID(jobID.getJtIdentifier() + "_" + namedOutput, jobID.getId());
-    } catch (NoSuchMethodException e) {
-      // Hadoop 1's setJobID method is package private and declared by JobContext
-      try {
-        setJobIDMethod = JobContext.class.getDeclaredMethod("setJobID", JobID.class);
-      } catch (NoSuchMethodException e1) {
-        throw new CrunchRuntimeException(e);
-      }
-      setJobIDMethod.setAccessible(true);
-    }
-    try {
-      setJobIDMethod.invoke(job, newJobID);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Could not set job ID to " + jobID, e);
-    }
+    job.setJobID(newJobID);
   }
 
   private static void configureJob(

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
deleted file mode 100644
index 3caa586..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificData;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.trevni.Input;
-import org.apache.trevni.avro.AvroColumnReader;
-import org.apache.trevni.avro.HadoopInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-public class TrevniFileReaderFactory<T> implements FileReaderFactory<T> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TrevniFileReaderFactory.class);
-  private final AvroType<T> aType;
-  private final MapFn<T, T> mapFn;
-  private final Schema schema;
-
-  public TrevniFileReaderFactory(AvroType<T> atype) {
-    this.aType = atype;
-    schema = atype.getSchema();
-    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
-  }
-
-  public TrevniFileReaderFactory(Schema schema) {
-    this.aType = null;
-    this.schema = schema;
-    this.mapFn = IdentityFn.<T>getInstance();
-  }
-
-  static <T> AvroColumnReader<T> getReader(Input input, AvroType<T> avroType, Schema schema) {
-    AvroColumnReader.Params params = new AvroColumnReader.Params(input);
-    params.setSchema(schema);
-    if (avroType.hasReflect()) {
-      if (avroType.hasSpecific()) {
-        Avros.checkCombiningSpecificAndReflectionSchemas();
-      }
-      params.setModel(ReflectData.get());
-    } else if (avroType.hasSpecific()) {
-      params.setModel(SpecificData.get());
-    } else {
-      params.setModel(GenericData.get());
-    }
-
-    try {
-      return new AvroColumnReader<T>(params);
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-  @Override
-  public Iterator<T> read(FileSystem fs, final Path path) {
-    this.mapFn.initialize();
-    try {
-      HadoopInput input = new HadoopInput(path, fs.getConf());
-      final AvroColumnReader<T> reader = getReader(input, aType, schema);
-      return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
-        @Override
-        public boolean hasNext() {
-          return reader.hasNext();
-        }
-
-        @Override
-        public T next() {
-          return mapFn.map(reader.next());
-        }
-      });
-    } catch (IOException e) {
-      LOG.info("Could not read avro file at path: {}", path, e);
-      return Iterators.emptyIterator();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
deleted file mode 100644
index fb0e8fe..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import java.util.List;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.trevni.avro.mapreduce.AvroTrevniKeyInputFormat;
-
-import java.io.IOException;
-
-public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
-  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
-    return FormatBundle.forInput(AvroTrevniKeyInputFormat.class)
-        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
-        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
-        .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString())
-        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
-  }
-
-  public TrevniKeySource(Path path, AvroType<T> ptype) {
-    super(path, ptype, getBundle(ptype));
-  }
-
-  public TrevniKeySource(List<Path> paths, AvroType<T> ptype) {
-    super(paths, ptype, getBundle(ptype));
-  }
-
-  @Override
-  public String toString() {
-    return "TrevniKey(" + pathsAsString() + ")";
-  }
-
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-    return read(conf, new TrevniFileReaderFactory<T>((AvroType<T>) ptype));
-  }
-
-  @Override
-  public ReadableData<T> asReadable() {
-    return new TrevniReadableData<T>(paths, (AvroType<T>) ptype);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
deleted file mode 100644
index 376d2ba..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.avro.AvroFileTarget;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.hadoop.fs.Path;
-
-public class TrevniKeySourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
-  public TrevniKeySourceTarget(Path path, AvroType<T> atype) {
-    this(path, atype, SequentialFileNamingScheme.getInstance());
-  }
-
-  public TrevniKeySourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) {
-    super(new TrevniKeySource(path, atype), new AvroFileTarget(path), fileNamingScheme);
-  }
-
-  @Override
-  public String toString() {
-    return target.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
deleted file mode 100644
index 5db11f0..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import org.apache.avro.hadoop.io.AvroKeyComparator;
-import org.apache.avro.hadoop.io.AvroSerialization;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.FileTargetImpl;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY;
-import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY_CLASS;
-
-public class TrevniKeyTarget extends FileTargetImpl {
-
-  public TrevniKeyTarget(String path) {
-    this(new Path(path));
-  }
-
-  public TrevniKeyTarget(Path path) {
-    this(path, SequentialFileNamingScheme.getInstance());
-  }
-
-  public TrevniKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
-    super(path, TrevniOutputFormat.class, fileNamingScheme);
-  }
-
-  @Override
-  public String toString() {
-    return "TrevniKey(" + path.toString() + ")";
-  }
-
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if (!(ptype instanceof AvroType)) {
-      return false;
-    }
-    handler.configure(this, ptype);
-    return true;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    AvroType<?> atype = (AvroType<?>) ptype;
-    Configuration conf = job.getConfiguration();
-
-    if (null == name) {
-      AvroJob.setOutputKeySchema(job, atype.getSchema());
-      AvroJob.setMapOutputKeySchema(job, atype.getSchema());
-
-      Avros.configureReflectDataFactory(conf);
-      configureForMapReduce(job, AvroKey.class, NullWritable.class, FormatBundle.forOutput(TrevniOutputFormat.class),
-          outputPath, null);
-    } else {
-      FormatBundle<TrevniOutputFormat> bundle = FormatBundle.forOutput(
-          TrevniOutputFormat.class);
-
-      bundle.set("avro.schema.output.key", atype.getSchema().toString());
-      bundle.set("mapred.output.value.groupfn.class", AvroKeyComparator.class.getName());
-      bundle.set("mapred.output.key.comparator.class", AvroKeyComparator.class.getName());
-      bundle.set("avro.serialization.key.writer.schema", atype.getSchema().toString());
-      bundle.set("avro.serialization.key.reader.schema", atype.getSchema().toString());
-
-      //Equivalent to...
-      // AvroSerialization.addToConfiguration(job.getConfiguration());
-      Collection<String> serializations = conf.getStringCollection("io.serializations");
-      if (!serializations.contains(AvroSerialization.class.getName())) {
-        serializations.add(AvroSerialization.class.getName());
-        bundle.set(name, StringUtils.arrayToString(serializations.toArray(new String[serializations.size()])));
-      }
-
-      //The following is equivalent to Avros.configureReflectDataFactory(conf);
-      bundle.set(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass().getName());
-
-      //Set output which honors the name.
-      bundle.set("mapred.output.dir", new Path(outputPath, name).toString());
-
-      //Set value which will be ignored but should get past the FileOutputFormat.checkOutputSpecs(..)
-      //which requires the "mapred.output.dir" value to be set.
-      try{
-        FileOutputFormat.setOutputPath(job, outputPath);
-      } catch(Exception ioe){
-          throw new RuntimeException(ioe);
-      }
-
-      CrunchOutputs.addNamedOutput(job, name,
-          bundle,
-          AvroKey.class,
-          NullWritable.class);
-    }
-  }
-
-  @Override
-  protected Path getSourcePattern(final Path workingPath, final int index) {
-    //output directories are typically of the form
-    //out#/part-m-#####/part-m-#####/part-#.trv but we don't want both of those folders because it isn't
-    //readable by the TrevniKeySource.
-    return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "*/part-*/part-*");
-  }
-
-  @Override
-  protected Path getDestFile(final Configuration conf, final Path src, final Path dir, final boolean mapOnlyJob) throws IOException {
-    Path outputFilename = super.getDestFile(conf, src, dir, true);
-    //make sure the dst file is unique in the case there are multiple part-#.trv files per partition.
-    return new Path(outputFilename.toString()+"-"+src.getName());
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    if (ptype instanceof AvroType) {
-      return new TrevniKeySourceTarget(path, (AvroType<T>) ptype);
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java
deleted file mode 100644
index a2b7dc9..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class TrevniOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> { 
-
-  @Override
-  public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new TrevniRecordWriter<T>(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java
deleted file mode 100644
index 5a681c4..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.ReadableDataImpl;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.hadoop.fs.Path;
-
-import java.util.List;
-
-public class TrevniReadableData<T> extends ReadableDataImpl<T> {
-  private final AvroType<T> avroType;
-
-  public TrevniReadableData(List<Path> paths, AvroType<T> avroType) {
-    super(paths);
-    this.avroType = avroType;
-  }
-
-  @Override
-  protected FileReaderFactory<T> getFileReaderFactory() {
-    return new TrevniFileReaderFactory<T>(avroType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java
deleted file mode 100644
index 74bb796..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro.trevni;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.trevni.ColumnFileMetaData;
-import org.apache.trevni.MetaData;
-import org.apache.trevni.avro.AvroColumnWriter;
-
-/**
- *
- */
-public class TrevniRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
-
-  /** trevni file extension */
-  public final static String EXT = ".trv";
-  
-  /** prefix of job configs that we care about */
-  public static final String META_PREFIX = "trevni.meta.";
-  
-  /** Counter that increments as new trevni files are create because the current file 
-   * has exceeded the block size 
-   * */
-  protected int part = 0;
-
-  /** Trevni file writer */
-  protected AvroColumnWriter<T> writer;
-
-  /** This will be a unique directory linked to the task */
-  final Path dirPath;
-  
-  /** HDFS object */
-  final FileSystem fs;
-
-  /** Current configured blocksize */
-  final long blockSize;
-  
-  /** Provided avro schema from the context */
-  protected Schema schema;
-  
-  /** meta data to be stored in the output file.  */
-  protected ColumnFileMetaData meta;
-  
-  public TrevniRecordWriter(TaskAttemptContext context) throws IOException {
-    schema = initSchema(context);
-    meta = filterMetadata(context.getConfiguration());
-    writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
-
-    Path outputPath = FileOutputFormat.getOutputPath(context);
-    
-    String dir = FileOutputFormat.getUniqueFile(context, "part", "");
-    dirPath = new Path(outputPath.toString() + "/" + dir);
-    fs = dirPath.getFileSystem(context.getConfiguration());
-    fs.mkdirs(dirPath);
-
-    blockSize = fs.getDefaultBlockSize();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void write(AvroKey<T> key, NullWritable value) throws IOException,
-      InterruptedException {
-    writer.write(key.datum());
-    if (writer.sizeEstimate() >= blockSize) // block full
-      flush();
-  }
-
-  /** {@inheritDoc} */
-  protected Schema initSchema(TaskAttemptContext context) {
-    boolean isMapOnly = context.getNumReduceTasks() == 0;
-    return isMapOnly ? AvroJob.getMapOutputKeySchema(context
-        .getConfiguration()) : AvroJob.getOutputKeySchema(context
-        .getConfiguration());
-  }
-  
-  /**
-   * A Trevni flush will close the current file and prep a new writer
-   * @throws IOException
-   */
-  public void flush() throws IOException {
-    OutputStream out = fs.create(new Path(dirPath, "part-" + (part++) + EXT));
-    try {
-      writer.writeTo(out);
-    } finally {
-      out.close();
-    }
-    writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
-  }
-  
-  /** {@inheritDoc} */
-  @Override
-  public void close(TaskAttemptContext arg0) throws IOException,
-      InterruptedException {
-    flush();
-  }
-  
-  static ColumnFileMetaData filterMetadata(final Configuration configuration) {
-    final ColumnFileMetaData meta = new ColumnFileMetaData();
-    Iterator<Entry<String, String>> keyIterator = configuration.iterator();
-
-    while (keyIterator.hasNext()) {
-      Entry<String, String> confEntry = keyIterator.next();
-      if (confEntry.getKey().startsWith(META_PREFIX))
-        meta.put(confEntry.getKey().substring(META_PREFIX.length()), confEntry
-            .getValue().getBytes(MetaData.UTF8));
-    }
-
-    return meta;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
index 90c15fa..e0e1326 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.UnmodifiableIterator;
 import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.types.PType;
@@ -36,11 +35,11 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Iterator;
 
 class DefaultFileReaderFactory<T> implements FileReaderFactory<T> {
@@ -62,7 +61,7 @@ class DefaultFileReaderFactory<T> implements FileReaderFactory<T> {
     ptype.initialize(conf);
 
     final InputFormat fmt = ReflectionUtils.newInstance(bundle.getFormatClass(), conf);
-    final TaskAttemptContext ctxt = TaskAttemptContextFactory.create(conf, new TaskAttemptID());
+    final TaskAttemptContext ctxt = new TaskAttemptContextImpl(conf, new TaskAttemptID());
     try {
       Job job = new Job(conf);
       FileInputFormat.addInputPath(job, path);

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index 44d3573..cea17e6 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
 import com.google.common.io.ByteStreams;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.mapred.SparkCounter;
@@ -34,6 +33,7 @@ import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.spark.Accumulator;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.broadcast.Broadcast;
@@ -80,7 +80,7 @@ public class SparkRuntimeContext implements Serializable {
         lastTID = null;
       }
       configureLocalFiles();
-      context = TaskInputOutputContextFactory.create(getConfiguration(), attemptID, new SparkReporter(counters));
+      context = new MapContextImpl(getConfiguration(), attemptID, null, null, null, new SparkReporter(counters), null);
     }
     fn.setContext(context);
     fn.initialize();

http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 942f203..cce69bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,10 +89,11 @@ under the License.
     <pkg>org.apache.crunch</pkg>
 
     <!-- Can be overridden by hadoop-2 profile, but these are the default values -->
-    <hadoop.version>1.1.2</hadoop.version>
-    <hbase.version>0.98.1-hadoop1</hbase.version>
-    <hbase.midfix>hadoop1</hbase.midfix>
-    <avro.classifier>hadoop1</avro.classifier>
+    <hadoop.version>2.2.0</hadoop.version>
+    <hbase.version>0.98.1-hadoop2</hbase.version>
+    <commons-lang.version>2.5</commons-lang.version>
+    <hbase.midfix>hadoop2</hbase.midfix>
+    <avro.classifier>hadoop2</avro.classifier>
 
     <scala.base.version>2.10</scala.base.version>
     <scala.version>2.10.4</scala.version>
@@ -224,26 +225,6 @@ under the License.
           </exclusion>
         </exclusions>
       </dependency>
-      <dependency>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>trevni-core</artifactId>
-        <version>${avro.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>trevni-avro</artifactId>
-        <version>${avro.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-core</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-ipc</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
 
       <dependency>
         <groupId>com.twitter</groupId>
@@ -469,64 +450,41 @@ under the License.
         <artifactId>jsr305</artifactId>
         <version>${jsr305.version}</version>
      </dependency>
-    </dependencies>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-common</artifactId>
+       <version>${hadoop.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-auth</artifactId>
+       <version>${hadoop.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-hdfs</artifactId>
+       <version>${hadoop.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-mapreduce-client-core</artifactId>
+       <version>${hadoop.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+       <version>${hadoop.version}</version>
+       <type>test-jar</type>
+       <scope>test</scope>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-annotations</artifactId>
+       <version>${hadoop.version}</version>
+     </dependency>
+   </dependencies>
   </dependencyManagement>
 
-  <profiles>
-    <profile>
-      <id>hadoop-2</id>
-      <activation>
-        <property>
-          <name>crunch.platform</name>
-          <value>2</value>
-        </property>
-      </activation>
-      <properties>
-        <hadoop.version>2.2.0</hadoop.version>
-        <hbase.version>0.98.1-hadoop2</hbase.version>
-        <commons-lang.version>2.5</commons-lang.version>
-        <hbase.midfix>hadoop2</hbase.midfix>
-        <avro.classifier>hadoop2</avro.classifier>
-      </properties>
-      <dependencyManagement>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-          </dependency>
-          <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-auth</artifactId>
-            <version>${hadoop.version}</version>
-          </dependency>
-          <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-          </dependency>
-          <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-          </dependency>
-          <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-            <version>${hadoop.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-          </dependency>
-          <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <version>${hadoop.version}</version>
-          </dependency>
-        </dependencies>
-      </dependencyManagement>
-    </profile>
-  </profiles>
-
   <build>
     <plugins>
       <plugin>


Mime
View raw message