incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [4/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java b/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
deleted file mode 100644
index 29bc9f9..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.impl.mr.emit.IntermediateEmitter;
-import com.cloudera.crunch.impl.mr.emit.MultipleOutputEmitter;
-import com.cloudera.crunch.impl.mr.emit.OutputEmitter;
-import com.cloudera.crunch.types.Converter;
-
-public class RTNode implements Serializable {
-  
-  private static final Log LOG = LogFactory.getLog(RTNode.class);
-  
-  private final String nodeName;
-  private DoFn<Object, Object> fn;
-  private final List<RTNode> children;
-  private final Converter inputConverter;
-  private final Converter outputConverter;
-  private final String outputName;
-
-  private transient Emitter<Object> emitter;
-
-  public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children,
-      Converter inputConverter, Converter outputConverter, String outputName) {
-    this.fn = fn;
-    this.nodeName = name;
-    this.children = children;
-    this.inputConverter = inputConverter;
-    this.outputConverter = outputConverter;
-    this.outputName = outputName;
-  }
-
-  public void initialize(CrunchTaskContext ctxt) {
-    if (emitter != null) {
-      // Already initialized
-      return;
-    }
-    
-    fn.setContext(ctxt.getContext());
-    for (RTNode child : children) {
-      child.initialize(ctxt);
-    }
-
-    if (outputConverter != null) {
-      if (outputName != null) {
-        this.emitter = new MultipleOutputEmitter(
-            outputConverter, ctxt.getMultipleOutputs(), outputName);
-      } else {
-        this.emitter = new OutputEmitter(
-            outputConverter, ctxt.getContext());
-      }
-    } else if (!children.isEmpty()) {
-      this.emitter = new IntermediateEmitter(children);
-    } else {
-      throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
-    }
-  }
-
-  public boolean isLeafNode() {
-    return outputConverter != null && children.isEmpty();
-  }
-
-  public void process(Object input) {
-    try {
-      fn.process(input, emitter);
-    } catch (CrunchRuntimeException e) {
-      if (!e.wasLogged()) {
-        LOG.info(String.format("Crunch exception in '%s' for input: %s",
-            nodeName, input.toString()), e);
-        e.markLogged();
-      }
-      throw e;
-    }
-  }
-
-  public void process(Object key, Object value) {
-    process(inputConverter.convertInput(key, value));
-  }
-
-  public void processIterable(Object key, Iterable values) {
-    process(inputConverter.convertIterableInput(key, values));
-  }
-  
-  public void cleanup() {
-    fn.cleanup(emitter);
-    emitter.flush();
-    for (RTNode child : children) {
-      child.cleanup();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children="
-        + children + ", inputConverter=" + inputConverter
-        + ", outputConverter=" + outputConverter + ", outputName=" + outputName
-        + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/RuntimeParameters.java b/src/main/java/com/cloudera/crunch/impl/mr/run/RuntimeParameters.java
deleted file mode 100644
index a4a50c4..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/RuntimeParameters.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-/**
- * Parameters used during the runtime execution.
- *
- */
-public class RuntimeParameters {
-
-  public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
-  
-  public static final String MULTI_INPUTS = "crunch.inputs.dir";
-
-  public static final String DEBUG = "crunch.debug";
-  
-  // Not instantiated
-  private RuntimeParameters() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/TaskAttemptContextFactory.java b/src/main/java/com/cloudera/crunch/impl/mr/run/TaskAttemptContextFactory.java
deleted file mode 100644
index 671de52..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/TaskAttemptContextFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-/**
- *
- */
-@SuppressWarnings("unchecked")
-public class TaskAttemptContextFactory {
-
-  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
-
-  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
-  
-  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
-    return INSTANCE.createInternal(conf, taskAttemptId);
-  }
-  
-  private Constructor taskAttemptConstructor;
-  
-  private TaskAttemptContextFactory() {
-    Class implClass = TaskAttemptContext.class;
-    if (implClass.isInterface()) {
-      try {
-        implClass = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-      } catch (ClassNotFoundException e) {
-        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
-      }
-    }
-    try {
-      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
-    } catch (Exception e) {
-      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
-    }
-  }
-  
-  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
-    try {
-      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
-    } catch (Exception e) {
-      LOG.error("Could not construct a TaskAttemptContext instance", e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/At.java b/src/main/java/com/cloudera/crunch/io/At.java
deleted file mode 100644
index c19a77b..0000000
--- a/src/main/java/com/cloudera/crunch/io/At.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Scan;
-
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.io.avro.AvroFileSourceTarget;
-import com.cloudera.crunch.io.hbase.HBaseSourceTarget;
-import com.cloudera.crunch.io.seq.SeqFileSourceTarget;
-import com.cloudera.crunch.io.seq.SeqFileTableSourceTarget;
-import com.cloudera.crunch.io.text.TextFileSourceTarget;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroType;
-import com.cloudera.crunch.types.writable.Writables;
-
-/**
- * Static factory methods for creating various {@link SourceTarget} types.
- *
- */
-public class At {
-  public static <T> AvroFileSourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
-	return avroFile(new Path(pathName), avroType);
-  }
-  
-  public static <T> AvroFileSourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
-	return new AvroFileSourceTarget<T>(path, avroType);
-  }
-  
-  public static HBaseSourceTarget hbaseTable(String table) {
-	return hbaseTable(table, new Scan());
-  }
-  
-  public static HBaseSourceTarget hbaseTable(String table, Scan scan) {
-	return new HBaseSourceTarget(table, scan);
-  }
-  
-  public static <T> SeqFileSourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
-	return sequenceFile(new Path(pathName), ptype);
-  }
-  
-  public static <T> SeqFileSourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
-	return new SeqFileSourceTarget<T>(path, ptype);
-  }
-  
-  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType,
-      PType<V> valueType) {
-	return sequenceFile(new Path(pathName), keyType, valueType);
-  }
-  
-  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType,
-      PType<V> valueType) {
-	PTypeFamily ptf = keyType.getFamily();
-	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
-  }
-  
-  public static TextFileSourceTarget<String> textFile(String pathName) {
-	return textFile(new Path(pathName));
-  }
-  
-  public static TextFileSourceTarget<String> textFile(Path path) {
-	return textFile(path, Writables.strings());
-  }
-  
-  public static <T> TextFileSourceTarget<T> textFile(String pathName, PType<T> ptype) {
-    return textFile(new Path(pathName), ptype);
-  }
-  
-  public static <T> TextFileSourceTarget<T> textFile(Path path, PType<T> ptype) {
-    return new TextFileSourceTarget<T>(path, ptype);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java b/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
deleted file mode 100644
index 774bbac..0000000
--- a/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-public class CompositePathIterable<T> implements Iterable<T> {
-
-  private final FileStatus[] stati;
-  private final FileSystem fs;
-  private final FileReaderFactory<T> readerFactory;
-
-  private static final PathFilter FILTER = new PathFilter() {
-	@Override
-	public boolean accept(Path path) {
-	  return !path.getName().startsWith("_");
-	}
-  };
-  
-  public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
-    
-    if (!fs.exists(path)){
-      throw new IOException("No files found to materialize at: " + path);
-    }
-    
-    FileStatus[] stati = null;
-    try {
-      stati = fs.listStatus(path, FILTER);
-    } catch (FileNotFoundException e) {
-      stati = null;
-    }
-    if (stati == null) {
-      throw new IOException("No files found to materialize at: " + path);
-    }
-
-    if (stati.length == 0) {
-      return Collections.emptyList();
-    } else {
-      return new CompositePathIterable<S>(stati, fs, readerFactory);
-    }
-
-  }
-  
-  private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
-	this.stati = stati;
-	this.fs = fs;
-	this.readerFactory = readerFactory;
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-
-	return new UnmodifiableIterator<T>() {
-	  private int index = 0;
-	  private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
-	  
-	  @Override
-	  public boolean hasNext() {
-		if (!iter.hasNext()) {
-		  while (index < stati.length) {
-       	    iter = readerFactory.read(fs, stati[index++].getPath());
-			if (iter.hasNext()) {
-			  return true;
-			}
-		  }
-		  return false;
-		}
-		return true;
-	  }
-
-	  @Override
-	  public T next() {
-		return iter.next();
-	  }
-	};
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/FileReaderFactory.java b/src/main/java/com/cloudera/crunch/io/FileReaderFactory.java
deleted file mode 100644
index 85b556e..0000000
--- a/src/main/java/com/cloudera/crunch/io/FileReaderFactory.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public interface FileReaderFactory<T> {
-  Iterator<T> read(FileSystem fs, Path path);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/From.java b/src/main/java/com/cloudera/crunch/io/From.java
deleted file mode 100644
index b4d6427..0000000
--- a/src/main/java/com/cloudera/crunch/io/From.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.io.avro.AvroFileSource;
-import com.cloudera.crunch.io.hbase.HBaseSourceTarget;
-import com.cloudera.crunch.io.impl.FileTableSourceImpl;
-import com.cloudera.crunch.io.seq.SeqFileSource;
-import com.cloudera.crunch.io.seq.SeqFileTableSourceTarget;
-import com.cloudera.crunch.io.text.TextFileSource;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroType;
-import com.cloudera.crunch.types.writable.Writables;
-
-/**
- * Static factory methods for creating various {@link Source} types.
- *
- */
-public class From {
-
-  public static <K, V> TableSource<K, V> formattedFile(String path,
-      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
-	return formattedFile(new Path(path), formatClass, keyType, valueType);
-  }
-
-  public static <K, V> TableSource<K, V> formattedFile(Path path,
-      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
-	PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
-    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);                                             	
-  }
-
-  public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
-	return avroFile(new Path(pathName), avroType);
-  }
-  
-  public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
-	return new AvroFileSource<T>(path, avroType);
-  }
-  
-  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table) {
-	return hbaseTable(table, new Scan());
-  }
-  
-  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table, Scan scan) {
-	return new HBaseSourceTarget(table, scan);
-  }
-  
-  public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
-	return sequenceFile(new Path(pathName), ptype);
-  }
-  
-  public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
-	return new SeqFileSource<T>(path, ptype);
-  }
-  
-  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType,
-      PType<V> valueType) {
-	return sequenceFile(new Path(pathName), keyType, valueType);
-  }
-  
-  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType,
-      PType<V> valueType) {
-	PTypeFamily ptf = keyType.getFamily();
-	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
-  }
-  
-  public static Source<String> textFile(String pathName) {
-	return textFile(new Path(pathName));
-  }
-  
-  public static Source<String> textFile(Path path) {
-	return textFile(path, Writables.strings());
-  }
-  
-  public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
-    return textFile(new Path(pathName), ptype);
-  }
-  
-  public static <T> Source<T> textFile(Path path, PType<T> ptype) {
-    return new TextFileSource<T>(path, ptype);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/MapReduceTarget.java b/src/main/java/com/cloudera/crunch/io/MapReduceTarget.java
deleted file mode 100644
index e0f1ccc..0000000
--- a/src/main/java/com/cloudera/crunch/io/MapReduceTarget.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.types.PType;
-
-public interface MapReduceTarget extends Target {
-  void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/OutputHandler.java b/src/main/java/com/cloudera/crunch/io/OutputHandler.java
deleted file mode 100644
index bfea81c..0000000
--- a/src/main/java/com/cloudera/crunch/io/OutputHandler.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.types.PType;
-
-public interface OutputHandler {
-  boolean configure(Target target, PType<?> ptype);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/PathTarget.java b/src/main/java/com/cloudera/crunch/io/PathTarget.java
deleted file mode 100644
index 972c2dc..0000000
--- a/src/main/java/com/cloudera/crunch/io/PathTarget.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-
-public interface PathTarget extends MapReduceTarget {
-  Path getPath();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/PathTargetImpl.java b/src/main/java/com/cloudera/crunch/io/PathTargetImpl.java
deleted file mode 100644
index d40f172..0000000
--- a/src/main/java/com/cloudera/crunch/io/PathTargetImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.cloudera.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.cloudera.crunch.types.PType;
-
-public abstract class PathTargetImpl implements PathTarget {
-
-  private final Path path;
-  private final Class<OutputFormat> outputFormatClass;
-  private final Class keyClass;
-  private final Class valueClass;
-  
-  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass,
-	  Class keyClass, Class valueClass) {
-	this(new Path(path), outputFormatClass, keyClass, valueClass);
-  }
-  
-  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass,
-	  Class keyClass, Class valueClass) {
-	this.path = path;
-	this.outputFormatClass = outputFormatClass;
-	this.keyClass = keyClass;
-	this.valueClass = valueClass;
-  }
-  
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-	  String name) {
-    try {
-      FileOutputFormat.setOutputPath(job, path);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    if (name == null) {
-      job.setOutputFormatClass(outputFormatClass);
-      job.setOutputKeyClass(keyClass);
-      job.setOutputValueClass(valueClass);
-    } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
-          keyClass, valueClass);
-    }
-  }
-
-  @Override
-  public Path getPath() {
-	return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/ReadableSource.java b/src/main/java/com/cloudera/crunch/io/ReadableSource.java
deleted file mode 100644
index 159449a..0000000
--- a/src/main/java/com/cloudera/crunch/io/ReadableSource.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.Source;
-
-public interface ReadableSource<T> extends Source<T> {
-  Iterable<T> read(Configuration conf) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/ReadableSourceTarget.java b/src/main/java/com/cloudera/crunch/io/ReadableSourceTarget.java
deleted file mode 100644
index 52e9f6f..0000000
--- a/src/main/java/com/cloudera/crunch/io/ReadableSourceTarget.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import com.cloudera.crunch.SourceTarget;
-
-/**
- * An interface that indicates that a {@code SourceTarget} instance can be
- * read into the local client.
- *
- * @param <T> The type of data read.
- */
-public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/SourceTargetHelper.java b/src/main/java/com/cloudera/crunch/io/SourceTargetHelper.java
deleted file mode 100644
index 09be419..0000000
--- a/src/main/java/com/cloudera/crunch/io/SourceTargetHelper.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Functions for configuring the inputs/outputs of MapReduce jobs.
- * 
- */
-public class SourceTargetHelper {
-
-	private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
-
-	public static long getPathSize(Configuration conf, Path path) throws IOException {
-		return getPathSize(FileSystem.get(conf), path);
-	}
-
-	public static long getPathSize(FileSystem fs, Path path) throws IOException {
-
-		if (!fs.exists(path)) {
-			return -1L;
-		}
-
-		FileStatus[] stati = null;
-		try {
-			stati = fs.listStatus(path);
-			if (stati == null) {
-				throw new FileNotFoundException(path + " doesn't exist");
-			}
-		} catch (FileNotFoundException e) {
-			LOG.warn("Returning 0 for getPathSize on non-existant path '" + path + "'");
-			return 0L;
-		}
-
-		if (stati.length == 0) {
-			return 0L;
-		}
-		long size = 0;
-		for (FileStatus status : stati) {
-			size += status.getLen();
-		}
-		return size;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/To.java b/src/main/java/com/cloudera/crunch/io/To.java
deleted file mode 100644
index 4634592..0000000
--- a/src/main/java/com/cloudera/crunch/io/To.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.io.avro.AvroFileTarget;
-import com.cloudera.crunch.io.hbase.HBaseTarget;
-import com.cloudera.crunch.io.impl.FileTargetImpl;
-import com.cloudera.crunch.io.seq.SeqFileTarget;
-import com.cloudera.crunch.io.text.TextFileTarget;
-
-/**
- * Static factory methods for creating various {@link Target} types.
- *
- */
-public class To {
-  
-  public static Target formattedFile(String pathName, Class<? extends FileOutputFormat> formatClass) {
-	return formattedFile(new Path(pathName), formatClass);
-  }
-  
-  public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) {
-	return new FileTargetImpl(path, formatClass);
-  }
-  
-  public static Target avroFile(String pathName) {
-	return avroFile(new Path(pathName));
-  }
-  
-  public static Target avroFile(Path path) {
-	return new AvroFileTarget(path);
-  }
-  
-  public static Target hbaseTable(String table) {
-	return new HBaseTarget(table);
-  }
-  
-  public static Target sequenceFile(String pathName) {
-	return sequenceFile(new Path(pathName));
-  }
-  
-  public static Target sequenceFile(Path path) {
-	return new SeqFileTarget(path);
-  }
-  
-  public static Target textFile(String pathName) {
-	return textFile(new Path(pathName));
-  }
-  
-  public static Target textFile(Path path) {
-	return new TextFileTarget(path);
-  }  
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
deleted file mode 100644
index a3d673c..0000000
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileReaderFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.io.FileReaderFactory;
-import com.cloudera.crunch.types.avro.AvroType;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
-
-	private static final Log LOG = LogFactory
-			.getLog(AvroFileReaderFactory.class);
-
-	private final DatumReader<T> recordReader;
-	private final MapFn<T, T> mapFn;
-	private final Configuration conf;
-
-	public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
-		this.recordReader = createDatumReader(atype);
-		this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
-		this.conf = conf;
-	}
-
-	private DatumReader<T> createDatumReader(AvroType<T> avroType) {
-		if (avroType.isSpecific()) {
-			return new SpecificDatumReader<T>(avroType.getSchema());
-		} else if (avroType.isGeneric()) {
-			return new GenericDatumReader<T>(avroType.getSchema());
-		} else {
-			return new ReflectDatumReader<T>(avroType.getSchema());
-		}
-	}
-
-	@Override
-	public Iterator<T> read(FileSystem fs, final Path path) {
-		this.mapFn.setConfigurationForTest(conf);
-		this.mapFn.initialize();
-		try {
-			FsInput fsi = new FsInput(path, fs.getConf());
-			final DataFileReader<T> reader = new DataFileReader<T>(fsi,
-					recordReader);
-			return new UnmodifiableIterator<T>() {
-				@Override
-				public boolean hasNext() {
-					return reader.hasNext();
-				}
-
-				@Override
-				public T next() {
-					return mapFn.map(reader.next());
-				}
-			};
-		} catch (IOException e) {
-			LOG.info("Could not read avro file at path: " + path, e);
-			return Iterators.emptyIterator();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
deleted file mode 100644
index 4debfeb..0000000
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroJob;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.cloudera.crunch.io.CompositePathIterable;
-import com.cloudera.crunch.io.ReadableSource;
-import com.cloudera.crunch.io.impl.FileSourceImpl;
-import com.cloudera.crunch.io.impl.InputBundle;
-import com.cloudera.crunch.types.avro.AvroInputFormat;
-import com.cloudera.crunch.types.avro.AvroType;
-import com.cloudera.crunch.types.avro.Avros;
-
-public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
-  public AvroFileSource(Path path, AvroType<T> ptype) {
-    super(path, ptype, new InputBundle(AvroInputFormat.class)
-        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(!ptype.isSpecific()))
-        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
-        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
-  }
-
-  @Override
-  public String toString() {
-    return "Avro(" + path.toString() + ")";
-  }
-
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(path.toUri(), conf);
-    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
-        (AvroType<T>) ptype, conf));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSourceTarget.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileSourceTarget.java
deleted file mode 100644
index 4d64a7f..0000000
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSourceTarget.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import org.apache.hadoop.fs.Path;
-
-import com.cloudera.crunch.io.impl.ReadableSourcePathTargetImpl;
-import com.cloudera.crunch.types.avro.AvroType;
-
-public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
-  public AvroFileSourceTarget(Path path, AvroType<T> atype) {
-	super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path));
-  }
-  
-  @Override
-  public String toString() {
-    return target.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileTarget.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileTarget.java
deleted file mode 100644
index 2569735..0000000
--- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileTarget.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.io.OutputHandler;
-import com.cloudera.crunch.io.impl.FileTargetImpl;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.avro.AvroOutputFormat;
-import com.cloudera.crunch.types.avro.AvroType;
-import com.cloudera.crunch.types.avro.Avros;
-
-public class AvroFileTarget extends FileTargetImpl {
-  public AvroFileTarget(String path) {
-    this(new Path(path));
-  }
-  
-  public AvroFileTarget(Path path) {
-    super(path, AvroOutputFormat.class);
-  }
-    
-  @Override
-  public String toString() {
-    return "Avro(" + path.toString() + ")";
-  }
-  
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if (!(ptype instanceof AvroType)) {
-      return false;
-    }
-    handler.configure(this, ptype);
-    return true;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-      String name) {
-    AvroType<?> atype = (AvroType<?>) ptype;
-    Configuration conf = job.getConfiguration();
-    String schemaParam = null;
-    if (name == null) {
-      schemaParam = "avro.output.schema";
-    } else {
-      schemaParam = "avro.output.schema." + name;
-    }
-    String outputSchema = conf.get(schemaParam);
-    if (outputSchema == null) {
-      conf.set(schemaParam, atype.getSchema().toString());
-    } else if (!outputSchema.equals(atype.getSchema().toString())) {
-      throw new IllegalStateException("Avro targets must use the same output schema");
-    }
-    Avros.configureReflectDataFactory(conf);
-    configureForMapReduce(job, AvroWrapper.class, NullWritable.class,
-        outputPath, name);
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    if (ptype instanceof AvroType) {
-      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/hbase/HBaseSourceTarget.java b/src/main/java/com/cloudera/crunch/io/hbase/HBaseSourceTarget.java
deleted file mode 100644
index 85b763e..0000000
--- a/src/main/java/com/cloudera/crunch/io/hbase/HBaseSourceTarget.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.hbase;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.impl.mr.run.CrunchMapper;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.writable.Writables;
-
-public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
-    TableSource<ImmutableBytesWritable, Result> {
-
-  private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
-      Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
-  
-  protected Scan scan;
-  
-  public HBaseSourceTarget(String table, Scan scan) {
-    super(table);
-    this.scan = scan;
-  }
-
-  @Override
-  public PType<Pair<ImmutableBytesWritable, Result>> getType() {
-    return PTYPE;
-  }
-
-  @Override
-  public PTableType<ImmutableBytesWritable, Result> getTableType() {
-    return PTYPE;
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof HBaseSourceTarget)) {
-      return false;
-    }
-    HBaseSourceTarget o = (HBaseSourceTarget) other;
-    // XXX scan does not have equals method
-    return table.equals(o.table) && scan.equals(o.scan);
-  }
-  
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(table).append(scan).toHashCode();
-  }
-  
-  @Override
-  public String toString() {
-    return "HBaseTable(" + table + ")";
-  }
-  
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-    Configuration conf = job.getConfiguration();
-    job.setInputFormatClass(TableInputFormat.class);
-    job.setMapperClass(CrunchMapper.class);
-    HBaseConfiguration.addHbaseResources(conf);
-    conf.set(TableInputFormat.INPUT_TABLE, table);
-    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
-    TableMapReduceUtil.addDependencyJars(job);
-  }
-  
-  static String convertScanToString(Scan scan) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(out);
-    scan.write(dos);
-    return Base64.encodeBytes(out.toByteArray());
-  }
-
-  @Override
-  public long getSize(Configuration conf) {
-    // TODO something smarter here.
-    return 1000L * 1000L * 1000L;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/hbase/HBaseTarget.java b/src/main/java/com/cloudera/crunch/io/hbase/HBaseTarget.java
deleted file mode 100644
index 4e43b20..0000000
--- a/src/main/java/com/cloudera/crunch/io/hbase/HBaseTarget.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.hbase;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
-import com.cloudera.crunch.io.MapReduceTarget;
-import com.cloudera.crunch.io.OutputHandler;
-import com.cloudera.crunch.types.PType;
-
-public class HBaseTarget implements MapReduceTarget {
-
-  protected String table;
-  
-  public HBaseTarget(String table) {
-    this.table = table;
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    if(this == other)
-      return true;
-    if(other == null)
-      return false;
-    if(!other.getClass().equals(getClass()))
-      return false;
-    HBaseTarget o = (HBaseTarget)other;
-    return table.equals(o.table);
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(table).toHashCode();
-  }
-  
-  @Override
-  public String toString() {
-    return "HBaseTable(" + table + ")";
-  }
-
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if(Put.class.equals(ptype.getTypeClass())) {
-      handler.configure(this, ptype);
-      return true;      
-    }
-    return false;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    Configuration conf = job.getConfiguration();
-    HBaseConfiguration.addHbaseResources(conf);
-    job.setOutputFormatClass(TableOutputFormat.class);
-    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
-    try {
-      TableMapReduceUtil.addDependencyJars(job);      
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/FileSourceImpl.java b/src/main/java/com/cloudera/crunch/io/impl/FileSourceImpl.java
deleted file mode 100644
index 49d985d..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/FileSourceImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.impl.mr.run.CrunchInputs;
-import com.cloudera.crunch.io.SourceTargetHelper;
-import com.cloudera.crunch.types.PType;
-
-public abstract class FileSourceImpl<T> implements Source<T> {
-
-  private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
-  
-  protected final Path path;
-  protected final PType<T> ptype;
-  protected final InputBundle inputBundle;
-  
-  public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
-	this.path = path;
-	this.ptype = ptype;
-	this.inputBundle = new InputBundle(inputFormatClass);
-  }
-
-  public FileSourceImpl(Path path, PType<T> ptype, InputBundle inputBundle) {
-    this.path = path;
-    this.ptype = ptype;
-    this.inputBundle = inputBundle;
-  }
-
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-	if (inputId == -1) {
-      FileInputFormat.addInputPath(job, path);
-      job.setInputFormatClass(inputBundle.getInputFormatClass());
-      inputBundle.configure(job.getConfiguration());
-    } else {
-      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
-    }
-  }
-
-  @Override
-  public PType<T> getType() {
-    return ptype;
-  }
-  
-  @Override
-  public long getSize(Configuration configuration) {
-	try {
-	  return SourceTargetHelper.getPathSize(configuration, path);
-	} catch (IOException e) {
-	  LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
-	  throw new IllegalStateException("Failed to get the file size of:"+ path, e);
-	}
-  }
-
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !getClass().equals(other.getClass())) {
-      return false;
-    }
-    FileSourceImpl o = (FileSourceImpl) other;
-    return ptype.equals(o.ptype) && path.equals(o.path) &&
-        inputBundle.equals(o.inputBundle);
-  }
-  
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(ptype).append(path)
-        .append(inputBundle).toHashCode();
-  }
-  
-  @Override
-  public String toString() {
-	return new StringBuilder().append(inputBundle.getName())
-	    .append("(").append(path).append(")").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/FileTableSourceImpl.java b/src/main/java/com/cloudera/crunch/io/impl/FileTableSourceImpl.java
deleted file mode 100644
index e44d04d..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/FileTableSourceImpl.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.types.PTableType;
-
-public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>>
-	implements TableSource<K, V> {
-
-  public FileTableSourceImpl(Path path, PTableType<K, V> tableType,
-	  Class<? extends FileInputFormat> formatClass) {
-	super(path, tableType, formatClass);
-  }
-  
-  @Override
-  public PTableType<K, V> getTableType() {
-	return (PTableType<K, V>) getType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/FileTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/FileTargetImpl.java
deleted file mode 100644
index 6cab95f..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/FileTargetImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.io.OutputHandler;
-import com.cloudera.crunch.io.PathTarget;
-import com.cloudera.crunch.types.Converter;
-import com.cloudera.crunch.types.PType;
-
-public class FileTargetImpl implements PathTarget {
-
-  protected final Path path;
-  private final Class<? extends FileOutputFormat> outputFormatClass;
-  
-  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass) {
-	this.path = path;
-	this.outputFormatClass = outputFormatClass;
-  }
-  
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-	  String name) {
-    Converter converter = ptype.getConverter();
-    Class keyClass = converter.getKeyClass();
-    Class valueClass = converter.getValueClass();
-    configureForMapReduce(job, keyClass, valueClass, outputPath, name);
-  }
-
-  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
-	  Path outputPath, String name) {
-    try {
-      FileOutputFormat.setOutputPath(job, outputPath);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    if (name == null) {
-      job.setOutputFormatClass(outputFormatClass);
-      job.setOutputKeyClass(keyClass);
-      job.setOutputValueClass(valueClass);
-    } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
-          keyClass, valueClass);
-    }	
-  }
-  
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    handler.configure(this, ptype);
-    return true;
-  }
-  
-  @Override
-  public Path getPath() {
-	return path;
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !getClass().equals(other.getClass())) {
-      return false;
-    }
-    FileTargetImpl o = (FileTargetImpl) other;
-    return path.equals(o.path);
-  }
-  
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(path).toHashCode();
-  }
-  
-  @Override
-  public String toString() {
-	return new StringBuilder().append(outputFormatClass.getSimpleName())
-	    .append("(").append(path).append(")").toString();
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-	// By default, assume that we cannot do this.
-	return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/InputBundle.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/InputBundle.java b/src/main/java/com/cloudera/crunch/io/impl/InputBundle.java
deleted file mode 100644
index cfc39e6..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/InputBundle.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import com.google.common.collect.Maps;
-
-/**
- * A combination of an InputFormat and any configuration information that
- * InputFormat needs to run properly. InputBundles allow us to let different
- * InputFormats pretend as if they are the only InputFormat that exists in
- * a particular MapReduce job.
- */
-public class InputBundle implements Serializable {
-
-  private Class<? extends InputFormat> inputFormatClass;
-  private Map<String, String> extraConf;
-  
-  public static InputBundle fromSerialized(String serialized) {
-    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
-    try {
-      ObjectInputStream ois = new ObjectInputStream(bais);
-      InputBundle bundle = (InputBundle) ois.readObject();
-      ois.close();
-      return bundle;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  public InputBundle(Class<? extends InputFormat> inputFormatClass) {
-    this.inputFormatClass = inputFormatClass;
-    this.extraConf = Maps.newHashMap();
-  }
-  
-  public InputBundle set(String key, String value) {
-    this.extraConf.put(key, value);
-    return this;
-  }
-  
-  public Class<? extends InputFormat> getInputFormatClass() {
-    return inputFormatClass;
-  }
-  
-  public Map<String, String> getExtraConfiguration() {
-    return extraConf;
-  }
-  
-  public Configuration configure(Configuration conf) {
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      conf.set(e.getKey(), e.getValue());
-    }
-    return conf;
-  }
-  
-  public String serialize() {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(this);
-      oos.close();
-      return Base64.encodeBase64String(baos.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  public String getName() {
-    return inputFormatClass.getSimpleName();
-  }
-  
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode();
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof InputBundle)) {
-      return false;
-    }
-    InputBundle oib = (InputBundle) other;
-    return inputFormatClass.equals(oib.inputFormatClass) && extraConf.equals(oib.extraConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/ReadableSourcePathTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/ReadableSourcePathTargetImpl.java
deleted file mode 100644
index 805b627..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/ReadableSourcePathTargetImpl.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.io.PathTarget;
-import com.cloudera.crunch.io.ReadableSource;
-import com.cloudera.crunch.io.ReadableSourceTarget;
-
-public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T>
-	implements ReadableSourceTarget<T> {
-
-  public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target) {
-	super(source, target);
-  }
-  
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-	return ((ReadableSource<T>) source).read(conf);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/ReadableSourceTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/ReadableSourceTargetImpl.java
deleted file mode 100644
index fd16bbc..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/ReadableSourceTargetImpl.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.io.ReadableSource;
-import com.cloudera.crunch.io.ReadableSourceTarget;
-
-public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements
-	ReadableSourceTarget<T> {
-
-  public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
-	super(source, target);
-  }
-  
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-	return ((ReadableSource<T>) source).read(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/SourcePathTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/SourcePathTargetImpl.java
deleted file mode 100644
index e16374e..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/SourcePathTargetImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.io.PathTarget;
-import com.cloudera.crunch.types.PType;
-
-public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements
-	PathTarget {
-
-  public SourcePathTargetImpl(Source<T> source, PathTarget target) {
-	super(source, target);
-  }
-  
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-	  String name) {
-	((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
-  }
-
-  @Override
-  public Path getPath() {
-	return ((PathTarget) target).getPath();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/SourceTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/SourceTargetImpl.java
deleted file mode 100644
index b647efa..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/SourceTargetImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.io.OutputHandler;
-import com.cloudera.crunch.types.PType;
-
-public class SourceTargetImpl<T> implements SourceTarget<T> {
-
-  protected final Source<T> source;
-  protected final Target target;
-  
-  public SourceTargetImpl(Source<T> source, Target target) {
-	this.source = source;
-	this.target = target;
-  }
-  
-  @Override
-  public PType<T> getType() {
-	return source.getType();
-  }
-
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-	source.configureSource(job, inputId);
-  }
-
-  @Override
-  public long getSize(Configuration configuration) {
-	return source.getSize(configuration);
-  }
-
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-	return target.accept(handler, ptype);
-  }
-
-  @Override
-  public <S> SourceTarget<S> asSourceTarget(PType<S> ptype) {
-	return target.asSourceTarget(ptype);
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-	if (other == null || !(other.getClass().equals(getClass()))) {
-	  return false;
-	}
-	SourceTargetImpl sti = (SourceTargetImpl) other;
-	return source.equals(sti.source) && target.equals(sti.target);
-  }
-  
-  @Override
-  public int hashCode() {
-	return new HashCodeBuilder().append(source).append(target).toHashCode();
-  }
-  
-  @Override
-  public String toString() {
-	return source.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/TableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/TableSourcePathTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/TableSourcePathTargetImpl.java
deleted file mode 100644
index 69e31ea..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/TableSourcePathTargetImpl.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.io.PathTarget;
-import com.cloudera.crunch.types.PTableType;
-
-public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>>
-	implements TableSource<K, V> {
-
-  public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) {
-	super(source, target);
-  }
-  
-  @Override
-  public PTableType<K, V> getTableType() {
-	return ((TableSource<K, V>) source).getTableType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/impl/TableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/impl/TableSourceTargetImpl.java b/src/main/java/com/cloudera/crunch/io/impl/TableSourceTargetImpl.java
deleted file mode 100644
index 757f648..0000000
--- a/src/main/java/com/cloudera/crunch/io/impl/TableSourceTargetImpl.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.impl;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.TableSource;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.types.PTableType;
-
-public class TableSourceTargetImpl<K, V> extends SourceTargetImpl<Pair<K, V>>
-	implements TableSource<K, V> {
-
-  public TableSourceTargetImpl(TableSource<K, V> source, Target target) {
-	super(source, target);
-  }
-  
-  @Override
-  public PTableType<K, V> getTableType() {
-	return ((TableSource<K, V>) source).getTableType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/seq/SeqFileHelper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileHelper.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileHelper.java
deleted file mode 100644
index 6c94ab6..0000000
--- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileHelper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.seq;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.writable.WritableType;
-
-public class SeqFileHelper {
-  static <T> Writable newInstance(PType<T> ptype, Configuration conf) {
-	return (Writable) ReflectionUtils.newInstance(
-	    ((WritableType) ptype).getSerializationClass(), conf); 
-  }
-  
-  static <T> MapFn<Object, T> getInputMapFn(PType<T> ptype) {
-	return ptype.getInputMapFn();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileReaderFactory.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileReaderFactory.java
deleted file mode 100644
index 4e67a94..0000000
--- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileReaderFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.seq;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.io.FileReaderFactory;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
-
-  private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
-  
-  private final MapFn<Object, T> mapFn;
-  private final Writable key;
-  private final Writable value;
-  private final Configuration conf;
-
-  public SeqFileReaderFactory(PType<T> ptype, Configuration conf) {
-	this.mapFn = SeqFileHelper.getInputMapFn(ptype);
-	this.key = NullWritable.get();
-	this.value = SeqFileHelper.newInstance(ptype, conf);
-	this.conf = conf;
-  }
-  
-  @Override
-  public Iterator<T> read(FileSystem fs, final Path path) {
-    mapFn.setConfigurationForTest(conf);
-    mapFn.initialize();
-	try {
-	  final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-	  return new UnmodifiableIterator<T>() {
-	    boolean nextChecked = false;
-	    boolean hasNext = false;
-
-	    @Override
-		public boolean hasNext() {
-		  if (nextChecked == true) {
-		    return hasNext;
-		  }
-		  try {
-			hasNext = reader.next(key, value);
-			nextChecked = true;
-			return hasNext;
-		  } catch (IOException e) {
-			LOG.info("Error reading from path: " + path, e);
-			return false;
-		  }
-		}
-
-		@Override
-		public T next() {
-		  if (!nextChecked && !hasNext()) {
-		    return null;
-		  }
-		  nextChecked = false;
-		  return mapFn.map(value);
-		}
-	  };
-	} catch (IOException e) {
-	  LOG.info("Could not read seqfile at path: " + path, e);
-	  return Iterators.emptyIterator();
-	}
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
deleted file mode 100644
index 462ef93..0000000
--- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.seq;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
-import com.cloudera.crunch.io.CompositePathIterable;
-import com.cloudera.crunch.io.ReadableSource;
-import com.cloudera.crunch.io.impl.FileSourceImpl;
-import com.cloudera.crunch.types.PType;
-
-public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
-  public SeqFileSource(Path path, PType<T> ptype) {
-    super(path, ptype, SequenceFileInputFormat.class);
-  }
-
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(path.toUri(), conf);
-    return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf));
-  }
-
-  @Override
-  public String toString() {
-    return "SeqFile(" + path.toString() + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/io/seq/SeqFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSourceTarget.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileSourceTarget.java
deleted file mode 100644
index 4f91167..0000000
--- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSourceTarget.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.seq;
-
-import org.apache.hadoop.fs.Path;
-
-import com.cloudera.crunch.io.impl.ReadableSourcePathTargetImpl;
-import com.cloudera.crunch.types.PType;
-
-public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
-  public SeqFileSourceTarget(String path, PType<T> ptype) {
-    this(new Path(path), ptype);
-  }
-  
-  public SeqFileSourceTarget(Path path, PType<T> ptype) {
-    super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path));
-  }
-  
-  @Override
-  public String toString() {
-    return target.toString();
-  }
-}


Mime
View raw message