incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [5/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
index 2cfa615..b196f89 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
@@ -34,13 +34,13 @@ public class TaskAttemptContextFactory {
   private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
 
   private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
-  
+
   public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
     return INSTANCE.createInternal(conf, taskAttemptId);
   }
-  
+
   private Constructor taskAttemptConstructor;
-  
+
   private TaskAttemptContextFactory() {
     Class implClass = TaskAttemptContext.class;
     if (implClass.isInterface()) {
@@ -56,7 +56,7 @@ public class TaskAttemptContextFactory {
       LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
     }
   }
-  
+
   private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
     try {
       return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java
index 2f5fe8b..2d787e3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/At.java
+++ b/crunch/src/main/java/org/apache/crunch/io/At.java
@@ -17,9 +17,6 @@
  */
 package org.apache.crunch.io;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Scan;
-
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
 import org.apache.crunch.io.hbase.HBaseSourceTarget;
@@ -30,59 +27,59 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
 
 /**
  * Static factory methods for creating various {@link SourceTarget} types.
- *
+ * 
  */
 public class At {
   public static <T> AvroFileSourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
-	return avroFile(new Path(pathName), avroType);
+    return avroFile(new Path(pathName), avroType);
   }
-  
+
   public static <T> AvroFileSourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
-	return new AvroFileSourceTarget<T>(path, avroType);
+    return new AvroFileSourceTarget<T>(path, avroType);
   }
-  
+
   public static HBaseSourceTarget hbaseTable(String table) {
-	return hbaseTable(table, new Scan());
+    return hbaseTable(table, new Scan());
   }
-  
+
   public static HBaseSourceTarget hbaseTable(String table, Scan scan) {
-	return new HBaseSourceTarget(table, scan);
+    return new HBaseSourceTarget(table, scan);
   }
-  
+
   public static <T> SeqFileSourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
-	return sequenceFile(new Path(pathName), ptype);
+    return sequenceFile(new Path(pathName), ptype);
   }
-  
+
   public static <T> SeqFileSourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
-	return new SeqFileSourceTarget<T>(path, ptype);
+    return new SeqFileSourceTarget<T>(path, ptype);
   }
-  
-  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType,
-      PType<V> valueType) {
-	return sequenceFile(new Path(pathName), keyType, valueType);
+
+  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
+    return sequenceFile(new Path(pathName), keyType, valueType);
   }
-  
-  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType,
-      PType<V> valueType) {
-	PTypeFamily ptf = keyType.getFamily();
-	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+
+  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
+    PTypeFamily ptf = keyType.getFamily();
+    return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
   }
-  
+
   public static TextFileSourceTarget<String> textFile(String pathName) {
-	return textFile(new Path(pathName));
+    return textFile(new Path(pathName));
   }
-  
+
   public static TextFileSourceTarget<String> textFile(Path path) {
-	return textFile(path, Writables.strings());
+    return textFile(path, Writables.strings());
   }
-  
+
   public static <T> TextFileSourceTarget<T> textFile(String pathName, PType<T> ptype) {
     return textFile(new Path(pathName), ptype);
   }
-  
+
   public static <T> TextFileSourceTarget<T> textFile(Path path, PType<T> ptype) {
     return new TextFileSourceTarget<T>(path, ptype);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
index 0e4014a..a4723e9 100644
--- a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
+++ b/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
@@ -36,18 +36,18 @@ public class CompositePathIterable<T> implements Iterable<T> {
   private final FileReaderFactory<T> readerFactory;
 
   private static final PathFilter FILTER = new PathFilter() {
-	@Override
-	public boolean accept(Path path) {
-	  return !path.getName().startsWith("_");
-	}
+    @Override
+    public boolean accept(Path path) {
+      return !path.getName().startsWith("_");
+    }
   };
-  
+
   public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
-    
-    if (!fs.exists(path)){
+
+    if (!fs.exists(path)) {
       throw new IOException("No files found to materialize at: " + path);
     }
-    
+
     FileStatus[] stati = null;
     try {
       stati = fs.listStatus(path, FILTER);
@@ -65,38 +65,38 @@ public class CompositePathIterable<T> implements Iterable<T> {
     }
 
   }
-  
+
   private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
-	this.stati = stati;
-	this.fs = fs;
-	this.readerFactory = readerFactory;
+    this.stati = stati;
+    this.fs = fs;
+    this.readerFactory = readerFactory;
   }
 
   @Override
   public Iterator<T> iterator() {
 
-	return new UnmodifiableIterator<T>() {
-	  private int index = 0;
-	  private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
-	  
-	  @Override
-	  public boolean hasNext() {
-		if (!iter.hasNext()) {
-		  while (index < stati.length) {
-       	    iter = readerFactory.read(fs, stati[index++].getPath());
-			if (iter.hasNext()) {
-			  return true;
-			}
-		  }
-		  return false;
-		}
-		return true;
-	  }
-
-	  @Override
-	  public T next() {
-		return iter.next();
-	  }
-	};
+    return new UnmodifiableIterator<T>() {
+      private int index = 0;
+      private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
+
+      @Override
+      public boolean hasNext() {
+        if (!iter.hasNext()) {
+          while (index < stati.length) {
+            iter = readerFactory.read(fs, stati[index++].getPath());
+            if (iter.hasNext()) {
+              return true;
+            }
+          }
+          return false;
+        }
+        return true;
+      }
+
+      @Override
+      public T next() {
+        return iter.next();
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java
index 175c316..c7ae022 100644
--- a/crunch/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch/src/main/java/org/apache/crunch/io/From.java
@@ -17,12 +17,6 @@
  */
 package org.apache.crunch.io;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.io.avro.AvroFileSource;
@@ -36,71 +30,74 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
  * Static factory methods for creating various {@link Source} types.
- *
+ * 
  */
 public class From {
 
-  public static <K, V> TableSource<K, V> formattedFile(String path,
-      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
-	return formattedFile(new Path(path), formatClass, keyType, valueType);
+  public static <K, V> TableSource<K, V> formattedFile(String path, Class<? extends FileInputFormat> formatClass,
+      PType<K> keyType, PType<V> valueType) {
+    return formattedFile(new Path(path), formatClass, keyType, valueType);
   }
 
-  public static <K, V> TableSource<K, V> formattedFile(Path path,
-      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
-	PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
-    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);                                             	
+  public static <K, V> TableSource<K, V> formattedFile(Path path, Class<? extends FileInputFormat> formatClass,
+      PType<K> keyType, PType<V> valueType) {
+    PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
+    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);
   }
 
   public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
-	return avroFile(new Path(pathName), avroType);
+    return avroFile(new Path(pathName), avroType);
   }
-  
+
   public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
-	return new AvroFileSource<T>(path, avroType);
+    return new AvroFileSource<T>(path, avroType);
   }
-  
+
   public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table) {
-	return hbaseTable(table, new Scan());
+    return hbaseTable(table, new Scan());
   }
-  
+
   public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table, Scan scan) {
-	return new HBaseSourceTarget(table, scan);
+    return new HBaseSourceTarget(table, scan);
   }
-  
+
   public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
-	return sequenceFile(new Path(pathName), ptype);
+    return sequenceFile(new Path(pathName), ptype);
   }
-  
+
   public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
-	return new SeqFileSource<T>(path, ptype);
+    return new SeqFileSource<T>(path, ptype);
   }
-  
-  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType,
-      PType<V> valueType) {
-	return sequenceFile(new Path(pathName), keyType, valueType);
+
+  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
+    return sequenceFile(new Path(pathName), keyType, valueType);
   }
-  
-  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType,
-      PType<V> valueType) {
-	PTypeFamily ptf = keyType.getFamily();
-	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+
+  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
+    PTypeFamily ptf = keyType.getFamily();
+    return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
   }
-  
+
   public static Source<String> textFile(String pathName) {
-	return textFile(new Path(pathName));
+    return textFile(new Path(pathName));
   }
-  
+
   public static Source<String> textFile(Path path) {
-	return textFile(path, Writables.strings());
+    return textFile(path, Writables.strings());
   }
-  
+
   public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
     return textFile(new Path(pathName), ptype);
   }
-  
+
   public static <T> Source<T> textFile(Path path, PType<T> ptype) {
     return new TextFileSource<T>(path, ptype);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
index 09df684..b484103 100644
--- a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
@@ -17,11 +17,10 @@
  */
 package org.apache.crunch.io;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.Target;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 
 public interface MapReduceTarget extends Target {
   void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
index 050171f..14d5a66 100644
--- a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -17,37 +17,33 @@
  */
 package org.apache.crunch.io;
 
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
-import org.apache.crunch.types.PType;
-
 public abstract class PathTargetImpl implements PathTarget {
 
   private final Path path;
   private final Class<OutputFormat> outputFormatClass;
   private final Class keyClass;
   private final Class valueClass;
-  
-  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass,
-	  Class keyClass, Class valueClass) {
-	this(new Path(path), outputFormatClass, keyClass, valueClass);
+
+  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
+    this(new Path(path), outputFormatClass, keyClass, valueClass);
   }
-  
-  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass,
-	  Class keyClass, Class valueClass) {
-	this.path = path;
-	this.outputFormatClass = outputFormatClass;
-	this.keyClass = keyClass;
-	this.valueClass = valueClass;
+
+  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
+    this.path = path;
+    this.outputFormatClass = outputFormatClass;
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
   }
-  
+
   @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-	  String name) {
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     try {
       FileOutputFormat.setOutputPath(job, path);
     } catch (Exception e) {
@@ -58,13 +54,12 @@ public abstract class PathTargetImpl implements PathTarget {
       job.setOutputKeyClass(keyClass);
       job.setOutputValueClass(valueClass);
     } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
-          keyClass, valueClass);
+      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
     }
   }
 
   @Override
   public Path getPath() {
-	return path;
+    return path;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
index 0ecbec0..73a13a3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -19,9 +19,8 @@ package org.apache.crunch.io;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.Source;
+import org.apache.hadoop.conf.Configuration;
 
 public interface ReadableSource<T> extends Source<T> {
   Iterable<T> read(Configuration conf) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
index 112508f..95c90aa 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -20,10 +20,11 @@ package org.apache.crunch.io;
 import org.apache.crunch.SourceTarget;
 
 /**
- * An interface that indicates that a {@code SourceTarget} instance can be
- * read into the local client.
- *
- * @param <T> The type of data read.
+ * An interface that indicates that a {@code SourceTarget} instance can be read
+ * into the local client.
+ * 
+ * @param <T>
+ *          The type of data read.
  */
 public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
index 522bd42..f52bfe7 100644
--- a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
+++ b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -33,36 +33,36 @@ import org.apache.hadoop.fs.Path;
  */
 public class SourceTargetHelper {
 
-	private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
+  private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
 
-	public static long getPathSize(Configuration conf, Path path) throws IOException {
-		return getPathSize(FileSystem.get(conf), path);
-	}
+  public static long getPathSize(Configuration conf, Path path) throws IOException {
+    return getPathSize(FileSystem.get(conf), path);
+  }
 
-	public static long getPathSize(FileSystem fs, Path path) throws IOException {
+  public static long getPathSize(FileSystem fs, Path path) throws IOException {
 
-		if (!fs.exists(path)) {
-			return -1L;
-		}
+    if (!fs.exists(path)) {
+      return -1L;
+    }
 
-		FileStatus[] stati = null;
-		try {
-			stati = fs.listStatus(path);
-			if (stati == null) {
-				throw new FileNotFoundException(path + " doesn't exist");
-			}
-		} catch (FileNotFoundException e) {
-			LOG.warn("Returning 0 for getPathSize on non-existant path '" + path + "'");
-			return 0L;
-		}
+    FileStatus[] stati = null;
+    try {
+      stati = fs.listStatus(path);
+      if (stati == null) {
+        throw new FileNotFoundException(path + " doesn't exist");
+      }
+    } catch (FileNotFoundException e) {
+      LOG.warn("Returning 0 for getPathSize on non-existant path '" + path + "'");
+      return 0L;
+    }
 
-		if (stati.length == 0) {
-			return 0L;
-		}
-		long size = 0;
-		for (FileStatus status : stati) {
-			size += status.getLen();
-		}
-		return size;
-	}
+    if (stati.length == 0) {
+      return 0L;
+    }
+    long size = 0;
+    for (FileStatus status : stati) {
+      size += status.getLen();
+    }
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
index afe3655..3190c64 100644
--- a/crunch/src/main/java/org/apache/crunch/io/To.java
+++ b/crunch/src/main/java/org/apache/crunch/io/To.java
@@ -17,56 +17,55 @@
  */
 package org.apache.crunch.io;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
 import org.apache.crunch.Target;
 import org.apache.crunch.io.avro.AvroFileTarget;
 import org.apache.crunch.io.hbase.HBaseTarget;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.io.seq.SeqFileTarget;
 import org.apache.crunch.io.text.TextFileTarget;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
  * Static factory methods for creating various {@link Target} types.
- *
+ * 
  */
 public class To {
-  
+
   public static Target formattedFile(String pathName, Class<? extends FileOutputFormat> formatClass) {
-	return formattedFile(new Path(pathName), formatClass);
+    return formattedFile(new Path(pathName), formatClass);
   }
-  
+
   public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) {
-	return new FileTargetImpl(path, formatClass);
+    return new FileTargetImpl(path, formatClass);
   }
-  
+
   public static Target avroFile(String pathName) {
-	return avroFile(new Path(pathName));
+    return avroFile(new Path(pathName));
   }
-  
+
   public static Target avroFile(Path path) {
-	return new AvroFileTarget(path);
+    return new AvroFileTarget(path);
   }
-  
+
   public static Target hbaseTable(String table) {
-	return new HBaseTarget(table);
+    return new HBaseTarget(table);
   }
-  
+
   public static Target sequenceFile(String pathName) {
-	return sequenceFile(new Path(pathName));
+    return sequenceFile(new Path(pathName));
   }
-  
+
   public static Target sequenceFile(Path path) {
-	return new SeqFileTarget(path);
+    return new SeqFileTarget(path);
   }
-  
+
   public static Target textFile(String pathName) {
-	return textFile(new Path(pathName));
+    return textFile(new Path(pathName));
   }
-  
+
   public static Target textFile(Path path) {
-	return new TextFileTarget(path);
-  }  
+    return new TextFileTarget(path);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index b2689dd..8bfb4dc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -48,7 +48,6 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
     FileSystem fs = FileSystem.get(path.toUri(), conf);
-    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype,
-        conf));
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype, conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
index e1ff540..8b6208d 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -17,16 +17,15 @@
  */
 package org.apache.crunch.io.avro;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
 
 public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
   public AvroFileSourceTarget(Path path, AvroType<T> atype) {
-	super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path));
+    super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path));
   }
-  
+
   @Override
   public String toString() {
     return target.toString();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index f0340a3..cc513c7 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -18,11 +18,6 @@
 package org.apache.crunch.io.avro;
 
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.impl.FileTargetImpl;
@@ -30,21 +25,25 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.AvroOutputFormat;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
 
 public class AvroFileTarget extends FileTargetImpl {
   public AvroFileTarget(String path) {
     this(new Path(path));
   }
-  
+
   public AvroFileTarget(Path path) {
     super(path, AvroOutputFormat.class);
   }
-    
+
   @Override
   public String toString() {
     return "Avro(" + path.toString() + ")";
   }
-  
+
   @Override
   public boolean accept(OutputHandler handler, PType<?> ptype) {
     if (!(ptype instanceof AvroType)) {
@@ -55,8 +54,7 @@ public class AvroFileTarget extends FileTargetImpl {
   }
 
   @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-      String name) {
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     AvroType<?> atype = (AvroType<?>) ptype;
     Configuration conf = job.getConfiguration();
     String schemaParam = null;
@@ -72,8 +70,7 @@ public class AvroFileTarget extends FileTargetImpl {
       throw new IllegalStateException("Avro targets must use the same output schema");
     }
     Avros.configureReflectDataFactory(conf);
-    configureForMapReduce(job, AvroWrapper.class, NullWritable.class,
-        outputPath, name);
+    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, outputPath, name);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 0df84e5..fcb9de1 100644
--- a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -22,6 +22,13 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
@@ -32,22 +39,14 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.mapreduce.Job;
 
-import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-
 public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
     TableSource<ImmutableBytesWritable, Result> {
 
   private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
       Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
-  
+
   protected Scan scan;
-  
+
   public HBaseSourceTarget(String table, Scan scan) {
     super(table);
     this.scan = scan;
@@ -62,7 +61,7 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
   public PTableType<ImmutableBytesWritable, Result> getTableType() {
     return PTYPE;
   }
-  
+
   @Override
   public boolean equals(Object other) {
     if (other == null || !(other instanceof HBaseSourceTarget)) {
@@ -72,17 +71,17 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
     // XXX scan does not have equals method
     return table.equals(o.table) && scan.equals(o.scan);
   }
-  
+
   @Override
   public int hashCode() {
     return new HashCodeBuilder().append(table).append(scan).toHashCode();
   }
-  
+
   @Override
   public String toString() {
     return "HBaseTable(" + table + ")";
   }
-  
+
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
     Configuration conf = job.getConfiguration();
@@ -93,7 +92,7 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
     conf.set(TableInputFormat.SCAN, convertScanToString(scan));
     TableMapReduceUtil.addDependencyJars(job);
   }
-  
+
   static String convertScanToString(Scan scan) throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     DataOutputStream dos = new DataOutputStream(out);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 5e0b1c9..050cff1 100644
--- a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -20,6 +20,11 @@ package org.apache.crunch.io.hbase;
 import java.io.IOException;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -28,38 +33,32 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.io.MapReduceTarget;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.types.PType;
-
 public class HBaseTarget implements MapReduceTarget {
 
   protected String table;
-  
+
   public HBaseTarget(String table) {
     this.table = table;
   }
-  
+
   @Override
   public boolean equals(Object other) {
-    if(this == other)
+    if (this == other)
       return true;
-    if(other == null)
+    if (other == null)
       return false;
-    if(!other.getClass().equals(getClass()))
+    if (!other.getClass().equals(getClass()))
       return false;
-    HBaseTarget o = (HBaseTarget)other;
+    HBaseTarget o = (HBaseTarget) other;
     return table.equals(o.table);
   }
-  
+
   @Override
   public int hashCode() {
     HashCodeBuilder hcb = new HashCodeBuilder();
     return hcb.append(table).toHashCode();
   }
-  
+
   @Override
   public String toString() {
     return "HBaseTable(" + table + ")";
@@ -67,9 +66,9 @@ public class HBaseTarget implements MapReduceTarget {
 
   @Override
   public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if(Put.class.equals(ptype.getTypeClass())) {
+    if (Put.class.equals(ptype.getTypeClass())) {
       handler.configure(this, ptype);
-      return true;      
+      return true;
     }
     return false;
   }
@@ -81,7 +80,7 @@ public class HBaseTarget implements MapReduceTarget {
     job.setOutputFormatClass(TableOutputFormat.class);
     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
     try {
-      TableMapReduceUtil.addDependencyJars(job);      
+      TableMapReduceUtil.addDependencyJars(job);
     } catch (IOException e) {
       throw new CrunchRuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 3cbe70d..d3e9c6f 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -22,29 +22,28 @@ import java.io.IOException;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.CrunchInputs;
+import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
-import org.apache.crunch.Source;
-import org.apache.crunch.impl.mr.run.CrunchInputs;
-import org.apache.crunch.io.SourceTargetHelper;
-import org.apache.crunch.types.PType;
-
 public abstract class FileSourceImpl<T> implements Source<T> {
 
   private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
-  
+
   protected final Path path;
   protected final PType<T> ptype;
   protected final InputBundle inputBundle;
-  
+
   public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
-	this.path = path;
-	this.ptype = ptype;
-	this.inputBundle = new InputBundle(inputFormatClass);
+    this.path = path;
+    this.ptype = ptype;
+    this.inputBundle = new InputBundle(inputFormatClass);
   }
 
   public FileSourceImpl(Path path, PType<T> ptype, InputBundle inputBundle) {
@@ -55,7 +54,7 @@ public abstract class FileSourceImpl<T> implements Source<T> {
 
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
-	if (inputId == -1) {
+    if (inputId == -1) {
       FileInputFormat.addInputPath(job, path);
       job.setInputFormatClass(inputBundle.getInputFormatClass());
       inputBundle.configure(job.getConfiguration());
@@ -68,37 +67,33 @@ public abstract class FileSourceImpl<T> implements Source<T> {
   public PType<T> getType() {
     return ptype;
   }
-  
+
   @Override
   public long getSize(Configuration configuration) {
-	try {
-	  return SourceTargetHelper.getPathSize(configuration, path);
-	} catch (IOException e) {
-	  LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
-	  throw new IllegalStateException("Failed to get the file size of:"+ path, e);
-	}
+    try {
+      return SourceTargetHelper.getPathSize(configuration, path);
+    } catch (IOException e) {
+      LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
+      throw new IllegalStateException("Failed to get the file size of:" + path, e);
+    }
   }
 
-
   @Override
   public boolean equals(Object other) {
     if (other == null || !getClass().equals(other.getClass())) {
       return false;
     }
     FileSourceImpl o = (FileSourceImpl) other;
-    return ptype.equals(o.ptype) && path.equals(o.path) &&
-        inputBundle.equals(o.inputBundle);
+    return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle);
   }
-  
+
   @Override
   public int hashCode() {
-    return new HashCodeBuilder().append(ptype).append(path)
-        .append(inputBundle).toHashCode();
+    return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode();
   }
-  
+
   @Override
   public String toString() {
-	return new StringBuilder().append(inputBundle.getName())
-	    .append("(").append(path).append(")").toString();
+    return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
index 2bf22af..f6e8f1d 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -17,23 +17,20 @@
  */
 package org.apache.crunch.io.impl;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
-public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>>
-	implements TableSource<K, V> {
+public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implements TableSource<K, V> {
 
-  public FileTableSourceImpl(Path path, PTableType<K, V> tableType,
-	  Class<? extends FileInputFormat> formatClass) {
-	super(path, tableType, formatClass);
+  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, Class<? extends FileInputFormat> formatClass) {
+    super(path, tableType, formatClass);
   }
-  
+
   @Override
   public PTableType<K, V> getTableType() {
-	return (PTableType<K, V>) getType();
+    return (PTableType<K, V>) getType();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 334cac3..196e38c 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,38 +18,35 @@
 package org.apache.crunch.io.impl;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class FileTargetImpl implements PathTarget {
 
   protected final Path path;
   private final Class<? extends FileOutputFormat> outputFormatClass;
-  
+
   public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass) {
-	this.path = path;
-	this.outputFormatClass = outputFormatClass;
+    this.path = path;
+    this.outputFormatClass = outputFormatClass;
   }
-  
+
   @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-	  String name) {
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     Converter converter = ptype.getConverter();
     Class keyClass = converter.getKeyClass();
     Class valueClass = converter.getValueClass();
     configureForMapReduce(job, keyClass, valueClass, outputPath, name);
   }
 
-  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
-	  Path outputPath, String name) {
+  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass, Path outputPath, String name) {
     try {
       FileOutputFormat.setOutputPath(job, outputPath);
     } catch (Exception e) {
@@ -60,22 +57,21 @@ public class FileTargetImpl implements PathTarget {
       job.setOutputKeyClass(keyClass);
       job.setOutputValueClass(valueClass);
     } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
-          keyClass, valueClass);
-    }	
+      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+    }
   }
-  
+
   @Override
   public boolean accept(OutputHandler handler, PType<?> ptype) {
     handler.configure(this, ptype);
     return true;
   }
-  
+
   @Override
   public Path getPath() {
-	return path;
+    return path;
   }
-  
+
   @Override
   public boolean equals(Object other) {
     if (other == null || !getClass().equals(other.getClass())) {
@@ -84,21 +80,21 @@ public class FileTargetImpl implements PathTarget {
     FileTargetImpl o = (FileTargetImpl) other;
     return path.equals(o.path);
   }
-  
+
   @Override
   public int hashCode() {
     return new HashCodeBuilder().append(path).toHashCode();
   }
-  
+
   @Override
   public String toString() {
-	return new StringBuilder().append(outputFormatClass.getSimpleName())
-	    .append("(").append(path).append(")").toString();
+    return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
+        .toString();
   }
 
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-	// By default, assume that we cannot do this.
-	return null;
+    // By default, assume that we cannot do this.
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java b/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
index 73b1cfd..f92e70a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/InputBundle.java
@@ -35,14 +35,14 @@ import com.google.common.collect.Maps;
 /**
  * A combination of an InputFormat and any configuration information that
  * InputFormat needs to run properly. InputBundles allow us to let different
- * InputFormats pretend as if they are the only InputFormat that exists in
- * a particular MapReduce job.
+ * InputFormats pretend as if they are the only InputFormat that exists in a
+ * particular MapReduce job.
  */
 public class InputBundle implements Serializable {
 
   private Class<? extends InputFormat> inputFormatClass;
   private Map<String, String> extraConf;
-  
+
   public static InputBundle fromSerialized(String serialized) {
     ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
     try {
@@ -56,32 +56,32 @@ public class InputBundle implements Serializable {
       throw new RuntimeException(e);
     }
   }
-  
+
   public InputBundle(Class<? extends InputFormat> inputFormatClass) {
     this.inputFormatClass = inputFormatClass;
     this.extraConf = Maps.newHashMap();
   }
-  
+
   public InputBundle set(String key, String value) {
     this.extraConf.put(key, value);
     return this;
   }
-  
+
   public Class<? extends InputFormat> getInputFormatClass() {
     return inputFormatClass;
   }
-  
+
   public Map<String, String> getExtraConfiguration() {
     return extraConf;
   }
-  
+
   public Configuration configure(Configuration conf) {
     for (Map.Entry<String, String> e : extraConf.entrySet()) {
       conf.set(e.getKey(), e.getValue());
     }
     return conf;
   }
-  
+
   public String serialize() {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
@@ -93,16 +93,16 @@ public class InputBundle implements Serializable {
       throw new RuntimeException(e);
     }
   }
-  
+
   public String getName() {
     return inputFormatClass.getSimpleName();
   }
-  
+
   @Override
   public int hashCode() {
     return new HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode();
   }
-  
+
   @Override
   public boolean equals(Object other) {
     if (other == null || !(other instanceof InputBundle)) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
index 8f7104e..465797a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
@@ -19,22 +19,20 @@ package org.apache.crunch.io.impl;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
 
-public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T>
-	implements ReadableSourceTarget<T> {
+public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> implements ReadableSourceTarget<T> {
 
   public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target) {
-	super(source, target);
+    super(source, target);
   }
-  
+
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-	return ((ReadableSource<T>) source).read(conf);
+    return ((ReadableSource<T>) source).read(conf);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
index ac5c371..f435b3b 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
@@ -19,21 +19,19 @@ package org.apache.crunch.io.impl;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.Target;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
 
-public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements
-	ReadableSourceTarget<T> {
+public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements ReadableSourceTarget<T> {
 
   public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
-	super(source, target);
+    super(source, target);
   }
-  
+
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-	return ((ReadableSource<T>) source).read(conf);
+    return ((ReadableSource<T>) source).read(conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
index cb7e730..87f4901 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -17,28 +17,25 @@
  */
 package org.apache.crunch.io.impl;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.Source;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 
-public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements
-	PathTarget {
+public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget {
 
   public SourcePathTargetImpl(Source<T> source, PathTarget target) {
-	super(source, target);
+    super(source, target);
   }
-  
+
   @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
-	  String name) {
-	((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    ((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
   }
 
   @Override
   public Path getPath() {
-	return ((PathTarget) target).getPath();
+    return ((PathTarget) target).getPath();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 4dc432c..27a2d9c 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -20,66 +20,65 @@ package org.apache.crunch.io.impl;
 import java.io.IOException;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 
 public class SourceTargetImpl<T> implements SourceTarget<T> {
 
   protected final Source<T> source;
   protected final Target target;
-  
+
   public SourceTargetImpl(Source<T> source, Target target) {
-	this.source = source;
-	this.target = target;
+    this.source = source;
+    this.target = target;
   }
-  
+
   @Override
   public PType<T> getType() {
-	return source.getType();
+    return source.getType();
   }
 
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
-	source.configureSource(job, inputId);
+    source.configureSource(job, inputId);
   }
 
   @Override
   public long getSize(Configuration configuration) {
-	return source.getSize(configuration);
+    return source.getSize(configuration);
   }
 
   @Override
   public boolean accept(OutputHandler handler, PType<?> ptype) {
-	return target.accept(handler, ptype);
+    return target.accept(handler, ptype);
   }
 
   @Override
   public <S> SourceTarget<S> asSourceTarget(PType<S> ptype) {
-	return target.asSourceTarget(ptype);
+    return target.asSourceTarget(ptype);
   }
-  
+
   @Override
   public boolean equals(Object other) {
-	if (other == null || !(other.getClass().equals(getClass()))) {
-	  return false;
-	}
-	SourceTargetImpl sti = (SourceTargetImpl) other;
-	return source.equals(sti.source) && target.equals(sti.target);
+    if (other == null || !(other.getClass().equals(getClass()))) {
+      return false;
+    }
+    SourceTargetImpl sti = (SourceTargetImpl) other;
+    return source.equals(sti.source) && target.equals(sti.target);
   }
-  
+
   @Override
   public int hashCode() {
-	return new HashCodeBuilder().append(source).append(target).toHashCode();
+    return new HashCodeBuilder().append(source).append(target).toHashCode();
   }
-  
+
   @Override
   public String toString() {
-	return source.toString();
+    return source.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
index 2abf963..eb500dd 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
@@ -22,15 +22,14 @@ import org.apache.crunch.TableSource;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.PTableType;
 
-public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>>
-	implements TableSource<K, V> {
+public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K, V>> implements TableSource<K, V> {
 
   public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) {
-	super(source, target);
+    super(source, target);
   }
-  
+
   @Override
   public PTableType<K, V> getTableType() {
-	return ((TableSource<K, V>) source).getTableType();
+    return ((TableSource<K, V>) source).getTableType();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
index d674dbc..965b0f9 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourceTargetImpl.java
@@ -22,15 +22,14 @@ import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
 import org.apache.crunch.types.PTableType;
 
-public class TableSourceTargetImpl<K, V> extends SourceTargetImpl<Pair<K, V>>
-	implements TableSource<K, V> {
+public class TableSourceTargetImpl<K, V> extends SourceTargetImpl<Pair<K, V>> implements TableSource<K, V> {
 
   public TableSourceTargetImpl(TableSource<K, V> source, Target target) {
-	super(source, target);
+    super(source, target);
   }
-  
+
   @Override
   public PTableType<K, V> getTableType() {
-	return ((TableSource<K, V>) source).getTableType();
+    return ((TableSource<K, V>) source).getTableType();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
index ad5de2f..6f598f0 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileHelper.java
@@ -17,21 +17,19 @@
  */
 package org.apache.crunch.io.seq;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.WritableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class SeqFileHelper {
   static <T> Writable newInstance(PType<T> ptype, Configuration conf) {
-	return (Writable) ReflectionUtils.newInstance(
-	    ((WritableType) ptype).getSerializationClass(), conf); 
+    return (Writable) ReflectionUtils.newInstance(((WritableType) ptype).getSerializationClass(), conf);
   }
-  
+
   static <T> MapFn<Object, T> getInputMapFn(PType<T> ptype) {
-	return ptype.getInputMapFn();
+    return ptype.getInputMapFn();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 7db7ce5..47163e1 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -22,6 +22,9 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -29,66 +32,63 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 
-import org.apache.crunch.MapFn;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.types.PType;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.UnmodifiableIterator;
 
 public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
 
   private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
-  
+
   private final MapFn<Object, T> mapFn;
   private final Writable key;
   private final Writable value;
   private final Configuration conf;
 
   public SeqFileReaderFactory(PType<T> ptype, Configuration conf) {
-	this.mapFn = SeqFileHelper.getInputMapFn(ptype);
-	this.key = NullWritable.get();
-	this.value = SeqFileHelper.newInstance(ptype, conf);
-	this.conf = conf;
+    this.mapFn = SeqFileHelper.getInputMapFn(ptype);
+    this.key = NullWritable.get();
+    this.value = SeqFileHelper.newInstance(ptype, conf);
+    this.conf = conf;
   }
-  
+
   @Override
   public Iterator<T> read(FileSystem fs, final Path path) {
     mapFn.setConfigurationForTest(conf);
     mapFn.initialize();
-	try {
-	  final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-	  return new UnmodifiableIterator<T>() {
-	    boolean nextChecked = false;
-	    boolean hasNext = false;
+    try {
+      final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      return new UnmodifiableIterator<T>() {
+        boolean nextChecked = false;
+        boolean hasNext = false;
 
-	    @Override
-		public boolean hasNext() {
-		  if (nextChecked == true) {
-		    return hasNext;
-		  }
-		  try {
-			hasNext = reader.next(key, value);
-			nextChecked = true;
-			return hasNext;
-		  } catch (IOException e) {
-			LOG.info("Error reading from path: " + path, e);
-			return false;
-		  }
-		}
+        @Override
+        public boolean hasNext() {
+          if (nextChecked == true) {
+            return hasNext;
+          }
+          try {
+            hasNext = reader.next(key, value);
+            nextChecked = true;
+            return hasNext;
+          } catch (IOException e) {
+            LOG.info("Error reading from path: " + path, e);
+            return false;
+          }
+        }
 
-		@Override
-		public T next() {
-		  if (!nextChecked && !hasNext()) {
-		    return null;
-		  }
-		  nextChecked = false;
-		  return mapFn.map(value);
-		}
-	  };
-	} catch (IOException e) {
-	  LOG.info("Could not read seqfile at path: " + path, e);
-	  return Iterators.emptyIterator();
-	}
+        @Override
+        public T next() {
+          if (!nextChecked && !hasNext()) {
+            return null;
+          }
+          nextChecked = false;
+          return mapFn.map(value);
+        }
+      };
+    } catch (IOException e) {
+      LOG.info("Could not read seqfile at path: " + path, e);
+      return Iterators.emptyIterator();
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
index 9885dbf..6f0ad05 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -19,15 +19,14 @@ package org.apache.crunch.io.seq;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
 import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
 public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
index 4420c7f..f532472 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
@@ -17,20 +17,19 @@
  */
 package org.apache.crunch.io.seq;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
 
 public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
   public SeqFileSourceTarget(String path, PType<T> ptype) {
     this(new Path(path), ptype);
   }
-  
+
   public SeqFileSourceTarget(Path path, PType<T> ptype) {
     super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path));
   }
-  
+
   @Override
   public String toString() {
     return target.toString();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
index 4b32272..038142a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
@@ -22,24 +22,24 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.UnmodifiableIterator;
 
 public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K, V>> {
 
   private static final Log LOG = LogFactory.getLog(SeqFileTableReaderFactory.class);
-  
+
   private final MapFn<Object, K> keyMapFn;
   private final MapFn<Object, V> valueMapFn;
   private final Writable key;
@@ -47,29 +47,29 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
   private final Configuration conf;
 
   public SeqFileTableReaderFactory(PTableType<K, V> tableType, Configuration conf) {
-	PType<K> keyType = tableType.getKeyType();
-	PType<V> valueType = tableType.getValueType();
-	this.keyMapFn = SeqFileHelper.getInputMapFn(keyType);
-	this.valueMapFn = SeqFileHelper.getInputMapFn(valueType);
-	this.key = SeqFileHelper.newInstance(keyType, conf);
-	this.value = SeqFileHelper.newInstance(valueType, conf);
-	this.conf = conf;
+    PType<K> keyType = tableType.getKeyType();
+    PType<V> valueType = tableType.getValueType();
+    this.keyMapFn = SeqFileHelper.getInputMapFn(keyType);
+    this.valueMapFn = SeqFileHelper.getInputMapFn(valueType);
+    this.key = SeqFileHelper.newInstance(keyType, conf);
+    this.value = SeqFileHelper.newInstance(valueType, conf);
+    this.conf = conf;
   }
-  
+
   @Override
   public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) {
     keyMapFn.setConfigurationForTest(conf);
     keyMapFn.initialize();
     valueMapFn.setConfigurationForTest(conf);
     valueMapFn.initialize();
-	try {
-	  final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-	  return new UnmodifiableIterator<Pair<K, V>>() {
-	    boolean nextChecked = false;
-	    boolean hasNext = false;
+    try {
+      final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      return new UnmodifiableIterator<Pair<K, V>>() {
+        boolean nextChecked = false;
+        boolean hasNext = false;
 
-		@Override
-		public boolean hasNext() {
+        @Override
+        public boolean hasNext() {
           if (nextChecked == true) {
             return hasNext;
           }
@@ -83,18 +83,18 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
           }
         }
 
-		@Override
-		public Pair<K, V> next() {
-		  if (!nextChecked && !hasNext()) {
+        @Override
+        public Pair<K, V> next() {
+          if (!nextChecked && !hasNext()) {
             return null;
           }
           nextChecked = false;
-		  return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
-		}
-	  };
-	} catch (IOException e) {
-	  LOG.info("Could not read seqfile at path: " + path, e);
-	  return Iterators.emptyIterator();
-	}
+          return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
+        }
+      };
+    } catch (IOException e) {
+      LOG.info("Could not read seqfile at path: " + path, e);
+      return Iterators.emptyIterator();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
index aff9ac6..a846d66 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -19,16 +19,15 @@ package org.apache.crunch.io.seq;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
 import org.apache.crunch.Pair;
 import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
 import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
 /**
  *
@@ -38,7 +37,7 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen
   public SeqFileTableSource(String path, PTableType<K, V> ptype) {
     this(new Path(path), ptype);
   }
-  
+
   public SeqFileTableSource(Path path, PTableType<K, V> ptype) {
     super(path, ptype, SequenceFileInputFormat.class);
   }
@@ -46,8 +45,7 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen
   @Override
   public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
     FileSystem fs = FileSystem.get(path.toUri(), conf);
-    return CompositePathIterable.create(fs, path, 
-        new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
+    return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
   }
 
   @Override
@@ -55,4 +53,3 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen
     return "SeqFile(" + path.toString() + ")";
   }
 }
- 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
index b583255..a1fccb5 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
@@ -17,30 +17,30 @@
  */
 package org.apache.crunch.io.seq;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.fs.Path;
 
-public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K,V>> implements TableSource<K, V> {
+public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl<Pair<K, V>> implements
+    TableSource<K, V> {
   private final PTableType<K, V> tableType;
-  
+
   public SeqFileTableSourceTarget(String path, PTableType<K, V> tableType) {
     this(new Path(path), tableType);
   }
-  
+
   public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
     super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path));
     this.tableType = tableType;
   }
-  
+
   @Override
   public PTableType<K, V> getTableType() {
     return tableType;
   }
-  
+
   @Override
   public String toString() {
     return target.toString();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
index 6a17302..c03543a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
@@ -17,19 +17,18 @@
  */
 package org.apache.crunch.io.seq;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 public class SeqFileTarget extends FileTargetImpl {
   public SeqFileTarget(String path) {
     this(new Path(path));
   }
-  
+
   public SeqFileTarget(Path path) {
     super(path, SequenceFileOutputFormat.class);
   }
@@ -38,7 +37,7 @@ public class SeqFileTarget extends FileTargetImpl {
   public String toString() {
     return "SeqFile(" + path.toString() + ")";
   }
-  
+
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     if (ptype instanceof PTableType) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java b/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
index 34f86e2..1618c5a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
@@ -59,12 +59,11 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
     // and the next character was not Line Feed ('\n')
     private boolean CRFollowedByNonLF = false;
 
-    // in the case where a Carriage Return ('\r') was not followed by a 
+    // in the case where a Carriage Return ('\r') was not followed by a
     // Line Feed ('\n'), this variable will hold that non Line Feed character
     // that was read from the underlying stream.
     private byte nonLFChar;
 
-
     /**
      * Provide a bridge to get the bytes from the ByteArrayOutputStream without
      * creating a new byte array.
@@ -110,15 +109,14 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
     }
 
     /*
-     * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
-     * locally.
+     * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added
+     * here locally.
      */
-    private long readLine(InputStream in, 
-        OutputStream out) throws IOException {
+    private long readLine(InputStream in, OutputStream out) throws IOException {
       long bytes = 0;
       while (true) {
         int b = -1;
-        if(CRFollowedByNonLF) {
+        if (CRFollowedByNonLF) {
           // In the previous call, a Carriage Return ('\r') was followed
           // by a non Line Feed ('\n') character - in that call we would
           // have not returned the non Line Feed character but would have
@@ -134,13 +132,13 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
         }
         bytes += 1;
 
-        byte c = (byte)b;
+        byte c = (byte) b;
         if (c == '\n') {
           break;
         }
 
         if (c == '\r') {
-          byte nextC = (byte)in.read();
+          byte nextC = (byte) in.read();
           if (nextC != '\n') {
             CRFollowedByNonLF = true;
             nonLFChar = nextC;
@@ -158,14 +156,13 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
     }
 
     /** Read a line. */
-    public  boolean next(LongWritable key, Text value)
-        throws IOException {
+    public boolean next(LongWritable key, Text value) throws IOException {
       if (pos > end)
         return false;
 
       key.set(pos); // key is position
       buffer.reset();
-      // long bytesRead = LineRecordReader.readLine(in, buffer); 
+      // long bytesRead = LineRecordReader.readLine(in, buffer);
       long bytesRead = readLine(in, buffer);
       if (bytesRead == 0) {
         return false;
@@ -173,7 +170,7 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
       pos = in.getPos();
       // if we have read ahead because we encountered a carriage return
       // char followed by a non line feed char, decrement the pos
-      if(CRFollowedByNonLF) {
+      if (CRFollowedByNonLF) {
         pos--;
       }
 
@@ -195,13 +192,12 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
     }
 
     @Override
-    public  void close() throws IOException {
+    public void close() throws IOException {
       in.close();
     }
 
     @Override
-    public LongWritable getCurrentKey() throws IOException,
-    InterruptedException {
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
       return key;
     }
 
@@ -211,9 +207,8 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
     }
 
     @Override
-    public void initialize(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      // no op        
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+      // no op
     }
 
     @Override
@@ -224,16 +219,14 @@ public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
   }
 
   @Override
-  protected boolean isSplitable(JobContext context, Path file)  {
-    return true;  
+  protected boolean isSplitable(JobContext context, Path file) {
+    return true;
   }
 
   @Override
-  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
-      TaskAttemptContext context) {
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
     try {
-      return new BZip2LineRecordReader(context.getConfiguration(), 
-          (FileSplit) split);
+      return new BZip2LineRecordReader(context.getConfiguration(), (FileSplit) split);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }


Mime
View raw message