incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [12/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/From.java b/src/main/java/org/apache/crunch/io/From.java
new file mode 100644
index 0000000..175c316
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/From.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.avro.AvroFileSource;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.io.seq.SeqFileSource;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.io.text.TextFileSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.writable.Writables;
+
+/**
+ * Static factory methods for creating various {@link Source} types.
+ *
+ */
+public class From {
+
+  public static <K, V> TableSource<K, V> formattedFile(String path,
+      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
+	return formattedFile(new Path(path), formatClass, keyType, valueType);
+  }
+
+  public static <K, V> TableSource<K, V> formattedFile(Path path,
+      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
+	PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
+    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);                                             	
+  }
+
+  public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
+	return avroFile(new Path(pathName), avroType);
+  }
+  
+  public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
+	return new AvroFileSource<T>(path, avroType);
+  }
+  
+  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table) {
+	return hbaseTable(table, new Scan());
+  }
+  
+  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table, Scan scan) {
+	return new HBaseSourceTarget(table, scan);
+  }
+  
+  public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
+	return sequenceFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
+	return new SeqFileSource<T>(path, ptype);
+  }
+  
+  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType,
+      PType<V> valueType) {
+	return sequenceFile(new Path(pathName), keyType, valueType);
+  }
+  
+  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType,
+      PType<V> valueType) {
+	PTypeFamily ptf = keyType.getFamily();
+	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+  }
+  
+  public static Source<String> textFile(String pathName) {
+	return textFile(new Path(pathName));
+  }
+  
+  public static Source<String> textFile(Path path) {
+	return textFile(path, Writables.strings());
+  }
+  
+  public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
+    return textFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> Source<T> textFile(Path path, PType<T> ptype) {
+    return new TextFileSource<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/src/main/java/org/apache/crunch/io/MapReduceTarget.java
new file mode 100644
index 0000000..09df684
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/MapReduceTarget.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+
+public interface MapReduceTarget extends Target {
+  void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/OutputHandler.java b/src/main/java/org/apache/crunch/io/OutputHandler.java
new file mode 100644
index 0000000..01d7f99
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/OutputHandler.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+
+public interface OutputHandler {
+  boolean configure(Target target, PType<?> ptype);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/PathTarget.java b/src/main/java/org/apache/crunch/io/PathTarget.java
new file mode 100644
index 0000000..884ba43
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+
+public interface PathTarget extends MapReduceTarget {
+  Path getPath();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/src/main/java/org/apache/crunch/io/PathTargetImpl.java
new file mode 100644
index 0000000..050171f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.crunch.types.PType;
+
+public abstract class PathTargetImpl implements PathTarget {
+
+  private final Path path;
+  private final Class<OutputFormat> outputFormatClass;
+  private final Class keyClass;
+  private final Class valueClass;
+  
+  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass,
+	  Class keyClass, Class valueClass) {
+	this(new Path(path), outputFormatClass, keyClass, valueClass);
+  }
+  
+  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass,
+	  Class keyClass, Class valueClass) {
+	this.path = path;
+	this.outputFormatClass = outputFormatClass;
+	this.keyClass = keyClass;
+	this.valueClass = valueClass;
+  }
+  
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
+	  String name) {
+    try {
+      FileOutputFormat.setOutputPath(job, path);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (name == null) {
+      job.setOutputFormatClass(outputFormatClass);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(valueClass);
+    } else {
+      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
+          keyClass, valueClass);
+    }
+  }
+
+  @Override
+  public Path getPath() {
+	return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/ReadableSource.java b/src/main/java/org/apache/crunch/io/ReadableSource.java
new file mode 100644
index 0000000..0ecbec0
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.Source;
+
+public interface ReadableSource<T> extends Source<T> {
+  Iterable<T> read(Configuration conf) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
new file mode 100644
index 0000000..112508f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.SourceTarget;
+
+/**
+ * An interface that indicates that a {@code SourceTarget} instance can be
+ * read into the local client.
+ *
+ * @param <T> The type of data read.
+ */
+public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
new file mode 100644
index 0000000..522bd42
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Functions for configuring the inputs/outputs of MapReduce jobs.
+ * 
+ */
+public class SourceTargetHelper {
+
+	private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
+
+	public static long getPathSize(Configuration conf, Path path) throws IOException {
+		return getPathSize(FileSystem.get(conf), path);
+	}
+
+	public static long getPathSize(FileSystem fs, Path path) throws IOException {
+
+		if (!fs.exists(path)) {
+			return -1L;
+		}
+
+		FileStatus[] stati = null;
+		try {
+			stati = fs.listStatus(path);
+			if (stati == null) {
+				throw new FileNotFoundException(path + " doesn't exist");
+			}
+		} catch (FileNotFoundException e) {
+			LOG.warn("Returning 0 for getPathSize on non-existant path '" + path + "'");
+			return 0L;
+		}
+
+		if (stati.length == 0) {
+			return 0L;
+		}
+		long size = 0;
+		for (FileStatus status : stati) {
+			size += status.getLen();
+		}
+		return size;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/To.java b/src/main/java/org/apache/crunch/io/To.java
new file mode 100644
index 0000000..afe3655
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/To.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.io.hbase.HBaseTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.io.seq.SeqFileTarget;
+import org.apache.crunch.io.text.TextFileTarget;
+
+/**
+ * Static factory methods for creating various {@link Target} types.
+ *
+ */
+public class To {
+  
+  public static Target formattedFile(String pathName, Class<? extends FileOutputFormat> formatClass) {
+	return formattedFile(new Path(pathName), formatClass);
+  }
+  
+  public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) {
+	return new FileTargetImpl(path, formatClass);
+  }
+  
+  public static Target avroFile(String pathName) {
+	return avroFile(new Path(pathName));
+  }
+  
+  public static Target avroFile(Path path) {
+	return new AvroFileTarget(path);
+  }
+  
+  public static Target hbaseTable(String table) {
+	return new HBaseTarget(table);
+  }
+  
+  public static Target sequenceFile(String pathName) {
+	return sequenceFile(new Path(pathName));
+  }
+  
+  public static Target sequenceFile(Path path) {
+	return new SeqFileTarget(path);
+  }
+  
+  public static Target textFile(String pathName) {
+	return textFile(new Path(pathName));
+  }
+  
+  public static Target textFile(Path path) {
+	return new TextFileTarget(path);
+  }  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
new file mode 100644
index 0000000..c92c3ee
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.avro.AvroType;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
+
+	private static final Log LOG = LogFactory
+			.getLog(AvroFileReaderFactory.class);
+
+	private final DatumReader<T> recordReader;
+	private final MapFn<T, T> mapFn;
+	private final Configuration conf;
+
+	public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+		this.recordReader = createDatumReader(atype);
+		this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+		this.conf = conf;
+	}
+
+	private DatumReader<T> createDatumReader(AvroType<T> avroType) {
+		if (avroType.isSpecific()) {
+			return new SpecificDatumReader<T>(avroType.getSchema());
+		} else if (avroType.isGeneric()) {
+			return new GenericDatumReader<T>(avroType.getSchema());
+		} else {
+			return new ReflectDatumReader<T>(avroType.getSchema());
+		}
+	}
+
+	@Override
+	public Iterator<T> read(FileSystem fs, final Path path) {
+		this.mapFn.setConfigurationForTest(conf);
+		this.mapFn.initialize();
+		try {
+			FsInput fsi = new FsInput(path, fs.getConf());
+			final DataFileReader<T> reader = new DataFileReader<T>(fsi,
+					recordReader);
+			return new UnmodifiableIterator<T>() {
+				@Override
+				public boolean hasNext() {
+					return reader.hasNext();
+				}
+
+				@Override
+				public T next() {
+					return mapFn.map(reader.next());
+				}
+			};
+		} catch (IOException e) {
+			LOG.info("Could not read avro file at path: " + path, e);
+			return Iterators.emptyIterator();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
new file mode 100644
index 0000000..3cbe924
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.types.avro.AvroInputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+
+public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+  public AvroFileSource(Path path, AvroType<T> ptype) {
+    super(path, ptype, new InputBundle(AvroInputFormat.class)
+        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(!ptype.isSpecific()))
+        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
+        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
+  }
+
+  @Override
+  public String toString() {
+    return "Avro(" + path.toString() + ")";
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
+        (AvroType<T>) ptype, conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
new file mode 100644
index 0000000..e1ff540
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+
+public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+  public AvroFileSourceTarget(Path path, AvroType<T> atype) {
+	super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path));
+  }
+  
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
new file mode 100644
index 0000000..f0340a3
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+
+public class AvroFileTarget extends FileTargetImpl {
+  public AvroFileTarget(String path) {
+    this(new Path(path));
+  }
+  
+  public AvroFileTarget(Path path) {
+    super(path, AvroOutputFormat.class);
+  }
+    
+  @Override
+  public String toString() {
+    return "Avro(" + path.toString() + ")";
+  }
+  
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (!(ptype instanceof AvroType)) {
+      return false;
+    }
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
+      String name) {
+    AvroType<?> atype = (AvroType<?>) ptype;
+    Configuration conf = job.getConfiguration();
+    String schemaParam = null;
+    if (name == null) {
+      schemaParam = "avro.output.schema";
+    } else {
+      schemaParam = "avro.output.schema." + name;
+    }
+    String outputSchema = conf.get(schemaParam);
+    if (outputSchema == null) {
+      conf.set(schemaParam, atype.getSchema().toString());
+    } else if (!outputSchema.equals(atype.getSchema().toString())) {
+      throw new IllegalStateException("Avro targets must use the same output schema");
+    }
+    Avros.configureReflectDataFactory(conf);
+    configureForMapReduce(job, AvroWrapper.class, NullWritable.class,
+        outputPath, name);
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof AvroType) {
+      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
new file mode 100644
index 0000000..0df84e5
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+
+public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
+    TableSource<ImmutableBytesWritable, Result> {
+
+  private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
+      Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
+  
+  protected Scan scan;
+  
+  public HBaseSourceTarget(String table, Scan scan) {
+    super(table);
+    this.scan = scan;
+  }
+
+  @Override
+  public PType<Pair<ImmutableBytesWritable, Result>> getType() {
+    return PTYPE;
+  }
+
+  @Override
+  public PTableType<ImmutableBytesWritable, Result> getTableType() {
+    return PTYPE;
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof HBaseSourceTarget)) {
+      return false;
+    }
+    HBaseSourceTarget o = (HBaseSourceTarget) other;
+    // XXX scan does not have equals method
+    return table.equals(o.table) && scan.equals(o.scan);
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(table).append(scan).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return "HBaseTable(" + table + ")";
+  }
+  
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapperClass(CrunchMapper.class);
+    HBaseConfiguration.addHbaseResources(conf);
+    conf.set(TableInputFormat.INPUT_TABLE, table);
+    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+    TableMapReduceUtil.addDependencyJars(job);
+  }
+  
+  static String convertScanToString(Scan scan) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(out);
+    scan.write(dos);
+    return Base64.encodeBytes(out.toByteArray());
+  }
+
+  @Override
+  public long getSize(Configuration conf) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
new file mode 100644
index 0000000..5e0b1c9
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+
+public class HBaseTarget implements MapReduceTarget {
+
+  protected String table;
+  
+  public HBaseTarget(String table) {
+    this.table = table;
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if(this == other)
+      return true;
+    if(other == null)
+      return false;
+    if(!other.getClass().equals(getClass()))
+      return false;
+    HBaseTarget o = (HBaseTarget)other;
+    return table.equals(o.table);
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(table).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return "HBaseTable(" + table + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if(Put.class.equals(ptype.getTypeClass())) {
+      handler.configure(this, ptype);
+      return true;      
+    }
+    return false;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.addHbaseResources(conf);
+    job.setOutputFormatClass(TableOutputFormat.class);
+    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+    try {
+      TableMapReduceUtil.addDependencyJars(job);      
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
new file mode 100644
index 0000000..3cbe70d
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.CrunchInputs;
+import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.PType;
+
+public abstract class FileSourceImpl<T> implements Source<T> {
+
+  private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
+  
+  protected final Path path;
+  protected final PType<T> ptype;
+  protected final InputBundle inputBundle;
+  
+  public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
+	this.path = path;
+	this.ptype = ptype;
+	this.inputBundle = new InputBundle(inputFormatClass);
+  }
+
+  public FileSourceImpl(Path path, PType<T> ptype, InputBundle inputBundle) {
+    this.path = path;
+    this.ptype = ptype;
+    this.inputBundle = inputBundle;
+  }
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+	if (inputId == -1) {
+      FileInputFormat.addInputPath(job, path);
+      job.setInputFormatClass(inputBundle.getInputFormatClass());
+      inputBundle.configure(job.getConfiguration());
+    } else {
+      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
+    }
+  }
+
+  @Override
+  public PType<T> getType() {
+    return ptype;
+  }
+  
+  @Override
+  public long getSize(Configuration configuration) {
+	try {
+	  return SourceTargetHelper.getPathSize(configuration, path);
+	} catch (IOException e) {
+	  LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
+	  throw new IllegalStateException("Failed to get the file size of:"+ path, e);
+	}
+  }
+
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !getClass().equals(other.getClass())) {
+      return false;
+    }
+    FileSourceImpl o = (FileSourceImpl) other;
+    return ptype.equals(o.ptype) && path.equals(o.path) &&
+        inputBundle.equals(o.inputBundle);
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(ptype).append(path)
+        .append(inputBundle).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+	return new StringBuilder().append(inputBundle.getName())
+	    .append("(").append(path).append(")").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
new file mode 100644
index 0000000..2bf22af
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.types.PTableType;
+
+public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>>
+	implements TableSource<K, V> {
+
+  public FileTableSourceImpl(Path path, PTableType<K, V> tableType,
+	  Class<? extends FileInputFormat> formatClass) {
+	super(path, tableType, formatClass);
+  }
+  
+  @Override
+  public PTableType<K, V> getTableType() {
+	return (PTableType<K, V>) getType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
new file mode 100644
index 0000000..334cac3
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+
+public class FileTargetImpl implements PathTarget {
+
+  protected final Path path;
+  private final Class<? extends FileOutputFormat> outputFormatClass;
+  
+  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass) {
+	this.path = path;
+	this.outputFormatClass = outputFormatClass;
+  }
+  
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
+	  String name) {
+    Converter converter = ptype.getConverter();
+    Class keyClass = converter.getKeyClass();
+    Class valueClass = converter.getValueClass();
+    configureForMapReduce(job, keyClass, valueClass, outputPath, name);
+  }
+
+  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
+	  Path outputPath, String name) {
+    try {
+      FileOutputFormat.setOutputPath(job, outputPath);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (name == null) {
+      job.setOutputFormatClass(outputFormatClass);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(valueClass);
+    } else {
+      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
+          keyClass, valueClass);
+    }	
+  }
+  
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    handler.configure(this, ptype);
+    return true;
+  }
+  
+  @Override
+  public Path getPath() {
+	return path;
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !getClass().equals(other.getClass())) {
+      return false;
+    }
+    FileTargetImpl o = (FileTargetImpl) other;
+    return path.equals(o.path);
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(path).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+	return new StringBuilder().append(outputFormatClass.getSimpleName())
+	    .append("(").append(path).append(")").toString();
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+	// By default, assume that we cannot do this.
+	return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/InputBundle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/InputBundle.java b/src/main/java/org/apache/crunch/io/impl/InputBundle.java
new file mode 100644
index 0000000..73b1cfd
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/InputBundle.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A combination of an InputFormat and any configuration information that
+ * InputFormat needs to run properly. InputBundles allow us to let different
+ * InputFormats pretend as if they are the only InputFormat that exists in
+ * a particular MapReduce job.
+ */
+public class InputBundle implements Serializable {
+
+  private Class<? extends InputFormat> inputFormatClass;
+  private Map<String, String> extraConf;
+  
+  public static InputBundle fromSerialized(String serialized) {
+    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      InputBundle bundle = (InputBundle) ois.readObject();
+      ois.close();
+      return bundle;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public InputBundle(Class<? extends InputFormat> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+    this.extraConf = Maps.newHashMap();
+  }
+  
+  public InputBundle set(String key, String value) {
+    this.extraConf.put(key, value);
+    return this;
+  }
+  
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+  
+  public Map<String, String> getExtraConfiguration() {
+    return extraConf;
+  }
+  
+  public Configuration configure(Configuration conf) {
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+    return conf;
+  }
+  
+  public String serialize() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(this);
+      oos.close();
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public String getName() {
+    return inputFormatClass.getSimpleName();
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof InputBundle)) {
+      return false;
+    }
+    InputBundle oib = (InputBundle) other;
+    return inputFormatClass.equals(oib.inputFormatClass) && extraConf.equals(oib.extraConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
new file mode 100644
index 0000000..8f7104e
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+
+public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T>
+	implements ReadableSourceTarget<T> {
+
+  public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target) {
+	super(source, target);
+  }
+  
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+	return ((ReadableSource<T>) source).read(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
new file mode 100644
index 0000000..ac5c371
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+
+public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements
+	ReadableSourceTarget<T> {
+
+  public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
+	super(source, target);
+  }
+  
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+	return ((ReadableSource<T>) source).read(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
new file mode 100644
index 0000000..cb7e730
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PType;
+
+public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements
+	PathTarget {
+
+  public SourcePathTargetImpl(Source<T> source, PathTarget target) {
+	super(source, target);
+  }
+  
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
+	  String name) {
+	((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
+  }
+
+  @Override
+  public Path getPath() {
+	return ((PathTarget) target).getPath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
new file mode 100644
index 0000000..4dc432c
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+
+public class SourceTargetImpl<T> implements SourceTarget<T> {
+
+  protected final Source<T> source;
+  protected final Target target;
+  
+  public SourceTargetImpl(Source<T> source, Target target) {
+	this.source = source;
+	this.target = target;
+  }
+  
+  @Override
+  public PType<T> getType() {
+	return source.getType();
+  }
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+	source.configureSource(job, inputId);
+  }
+
+  @Override
+  public long getSize(Configuration configuration) {
+	return source.getSize(configuration);
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+	return target.accept(handler, ptype);
+  }
+
+  @Override
+  public <S> SourceTarget<S> asSourceTarget(PType<S> ptype) {
+	return target.asSourceTarget(ptype);
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+	if (other == null || !(other.getClass().equals(getClass()))) {
+	  return false;
+	}
+	SourceTargetImpl sti = (SourceTargetImpl) other;
+	return source.equals(sti.source) && target.equals(sti.target);
+  }
+  
+  @Override
+  public int hashCode() {
+	return new HashCodeBuilder().append(source).append(target).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+	return source.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
new file mode 100644
index 0000000..2abf963
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PTableType;
+
+public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>>
+	implements TableSource<K, V> {
+
+  public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) {
+	super(source, target);
+  }
+  
+  @Override
+  public PTableType<K, V> getTableType() {
+	return ((TableSource<K, V>) source).getTableType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java b/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
new file mode 100644
index 0000000..d674dbc
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PTableType;
+
+public class TableSourceTargetImpl<K, V> extends SourceTargetImpl<Pair<K, V>>
+	implements TableSource<K, V> {
+
+  public TableSourceTargetImpl(TableSource<K, V> source, Target target) {
+	super(source, target);
+  }
+  
+  @Override
+  public PTableType<K, V> getTableType() {
+	return ((TableSource<K, V>) source).getTableType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java b/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
new file mode 100644
index 0000000..ad5de2f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.WritableType;
+
+public class SeqFileHelper {
+  static <T> Writable newInstance(PType<T> ptype, Configuration conf) {
+	return (Writable) ReflectionUtils.newInstance(
+	    ((WritableType) ptype).getSerializationClass(), conf); 
+  }
+  
+  static <T> MapFn<Object, T> getInputMapFn(PType<T> ptype) {
+	return ptype.getInputMapFn();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
new file mode 100644
index 0000000..7db7ce5
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
+
+  private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
+  
+  private final MapFn<Object, T> mapFn;
+  private final Writable key;
+  private final Writable value;
+  private final Configuration conf;
+
+  public SeqFileReaderFactory(PType<T> ptype, Configuration conf) {
+	this.mapFn = SeqFileHelper.getInputMapFn(ptype);
+	this.key = NullWritable.get();
+	this.value = SeqFileHelper.newInstance(ptype, conf);
+	this.conf = conf;
+  }
+  
+  @Override
+  public Iterator<T> read(FileSystem fs, final Path path) {
+    mapFn.setConfigurationForTest(conf);
+    mapFn.initialize();
+	try {
+	  final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+	  return new UnmodifiableIterator<T>() {
+	    boolean nextChecked = false;
+	    boolean hasNext = false;
+
+	    @Override
+		public boolean hasNext() {
+		  if (nextChecked == true) {
+		    return hasNext;
+		  }
+		  try {
+			hasNext = reader.next(key, value);
+			nextChecked = true;
+			return hasNext;
+		  } catch (IOException e) {
+			LOG.info("Error reading from path: " + path, e);
+			return false;
+		  }
+		}
+
+		@Override
+		public T next() {
+		  if (!nextChecked && !hasNext()) {
+		    return null;
+		  }
+		  nextChecked = false;
+		  return mapFn.map(value);
+		}
+	  };
+	} catch (IOException e) {
+	  LOG.info("Could not read seqfile at path: " + path, e);
+	  return Iterators.emptyIterator();
+	}
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
new file mode 100644
index 0000000..9885dbf
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.PType;
+
+public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+  public SeqFileSource(Path path, PType<T> ptype) {
+    super(path, ptype, SequenceFileInputFormat.class);
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf));
+  }
+
+  @Override
+  public String toString() {
+    return "SeqFile(" + path.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
new file mode 100644
index 0000000..4420c7f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PType;
+
+public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+  public SeqFileSourceTarget(String path, PType<T> ptype) {
+    this(new Path(path), ptype);
+  }
+  
+  public SeqFileSourceTarget(Path path, PType<T> ptype) {
+    super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path));
+  }
+  
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
new file mode 100644
index 0000000..4b32272
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K, V>> {
+
+  private static final Log LOG = LogFactory.getLog(SeqFileTableReaderFactory.class);
+  
+  private final MapFn<Object, K> keyMapFn;
+  private final MapFn<Object, V> valueMapFn;
+  private final Writable key;
+  private final Writable value;
+  private final Configuration conf;
+
+  public SeqFileTableReaderFactory(PTableType<K, V> tableType, Configuration conf) {
+	PType<K> keyType = tableType.getKeyType();
+	PType<V> valueType = tableType.getValueType();
+	this.keyMapFn = SeqFileHelper.getInputMapFn(keyType);
+	this.valueMapFn = SeqFileHelper.getInputMapFn(valueType);
+	this.key = SeqFileHelper.newInstance(keyType, conf);
+	this.value = SeqFileHelper.newInstance(valueType, conf);
+	this.conf = conf;
+  }
+  
+  @Override
+  public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) {
+    keyMapFn.setConfigurationForTest(conf);
+    keyMapFn.initialize();
+    valueMapFn.setConfigurationForTest(conf);
+    valueMapFn.initialize();
+	try {
+	  final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+	  return new UnmodifiableIterator<Pair<K, V>>() {
+	    boolean nextChecked = false;
+	    boolean hasNext = false;
+
+		@Override
+		public boolean hasNext() {
+          if (nextChecked == true) {
+            return hasNext;
+          }
+          try {
+            hasNext = reader.next(key, value);
+            nextChecked = true;
+            return hasNext;
+          } catch (IOException e) {
+            LOG.info("Error reading from path: " + path, e);
+            return false;
+          }
+        }
+
+		@Override
+		public Pair<K, V> next() {
+		  if (!nextChecked && !hasNext()) {
+            return null;
+          }
+          nextChecked = false;
+		  return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
+		}
+	  };
+	} catch (IOException e) {
+	  LOG.info("Could not read seqfile at path: " + path, e);
+	  return Iterators.emptyIterator();
+	}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
new file mode 100644
index 0000000..aff9ac6
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.types.PTableType;
+
+/**
+ *
+ */
+public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implements ReadableSource<Pair<K, V>> {
+
+  public SeqFileTableSource(String path, PTableType<K, V> ptype) {
+    this(new Path(path), ptype);
+  }
+  
+  public SeqFileTableSource(Path path, PTableType<K, V> ptype) {
+    super(path, ptype, SequenceFileInputFormat.class);
+  }
+
+  @Override
+  public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return CompositePathIterable.create(fs, path, 
+        new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
+  }
+
+  @Override
+  public String toString() {
+    return "SeqFile(" + path.toString() + ")";
+  }
+}
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
new file mode 100644
index 0000000..b583255
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PTableType;
+
+public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K,V>> implements TableSource<K, V> {
+  private final PTableType<K, V> tableType;
+  
+  public SeqFileTableSourceTarget(String path, PTableType<K, V> tableType) {
+    this(new Path(path), tableType);
+  }
+  
+  public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
+    super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path));
+    this.tableType = tableType;
+  }
+  
+  @Override
+  public PTableType<K, V> getTableType() {
+    return tableType;
+  }
+  
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
new file mode 100644
index 0000000..6a17302
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.seq;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+public class SeqFileTarget extends FileTargetImpl {
+  public SeqFileTarget(String path) {
+    this(new Path(path));
+  }
+  
+  public SeqFileTarget(Path path) {
+    super(path, SequenceFileOutputFormat.class);
+  }
+
+  @Override
+  public String toString() {
+    return "SeqFile(" + path.toString() + ")";
+  }
+  
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof PTableType) {
+      return new SeqFileTableSourceTarget(path, (PTableType) ptype);
+    } else {
+      return new SeqFileSourceTarget(path, ptype);
+    }
+  }
+}


Mime
View raw message