incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: Reduce compiler warnings
Date Wed, 20 Jun 2012 19:33:59 GMT
Updated Branches:
  refs/heads/master 6ca9be42e -> f1210d883


Reduce compiler warnings

Compiler warning reductions with a combination using wildcards
for generics where applicable, as well as automated import
cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f1210d88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f1210d88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f1210d88

Branch: refs/heads/master
Commit: f1210d8833317b572e4e5aa5c13bd745585fe89c
Parents: 6ca9be4
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Wed Jun 20 20:51:37 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Wed Jun 20 20:51:37 2012 +0200

----------------------------------------------------------------------
 src/main/java/com/cloudera/crunch/CombineFn.java   |    2 +-
 .../com/cloudera/crunch/impl/mr/MRPipeline.java    |   14 +-
 .../cloudera/crunch/impl/mr/exec/MRExecutor.java   |    9 +-
 .../com/cloudera/crunch/impl/mr/plan/DoNode.java   |   22 ++--
 .../cloudera/crunch/impl/mr/plan/JobPrototype.java |   22 ++--
 .../cloudera/crunch/impl/mr/plan/MSCRPlanner.java  |   94 +++++++-------
 .../com/cloudera/crunch/impl/mr/plan/NodePath.java |   28 ++--
 .../crunch/impl/mr/run/CrunchInputFormat.java      |    4 +-
 .../crunch/impl/mr/run/CrunchInputSplit.java       |    4 +-
 .../cloudera/crunch/impl/mr/run/CrunchInputs.java  |    2 -
 .../crunch/impl/mr/run/CrunchTaskContext.java      |    1 +
 .../com/cloudera/crunch/impl/mr/run/RTNode.java    |    2 +-
 .../cloudera/crunch/io/CompositePathIterable.java  |    1 -
 .../cloudera/crunch/io/text/TextFileSource.java    |    2 +-
 .../java/com/cloudera/crunch/lib/Cartesian.java    |    4 +-
 .../com/cloudera/crunch/test/TestCounters.java     |    2 +-
 .../cloudera/crunch/types/avro/AvroTableType.java  |    1 -
 .../com/cloudera/crunch/types/avro/AvroType.java   |    3 -
 .../java/com/cloudera/crunch/types/avro/Avros.java |   32 +++---
 .../crunch/types/writable/WritableTableType.java   |    2 +-
 .../crunch/types/writable/WritableType.java        |    3 -
 .../crunch/types/writable/WritableTypeFamily.java  |    3 +-
 .../types/writable/WritableValueConverter.java     |    1 -
 src/main/java/com/cloudera/crunch/util/Protos.java |    2 +-
 .../lib/output/CrunchMultipleOutputs.java          |    1 -
 .../java/com/cloudera/crunch/CollectionsTest.java  |   15 +--
 .../java/com/cloudera/crunch/CombineFnTest.java    |   27 ++++-
 .../java/com/cloudera/crunch/FilterFnTest.java     |    2 +-
 .../com/cloudera/crunch/MaterializeToMapTest.java  |    3 +-
 .../com/cloudera/crunch/MultipleOutputTest.java    |   26 ++---
 .../cloudera/crunch/PCollectionGetSizeTest.java    |    3 -
 .../com/cloudera/crunch/PTableKeyValueTest.java    |    1 -
 .../java/com/cloudera/crunch/PageRankTest.java     |   10 +-
 .../cloudera/crunch/TupleNClassCastBugTest.java    |   22 ++--
 .../java/com/cloudera/crunch/WordCountTest.java    |   98 +++++++--------
 .../java/com/cloudera/crunch/fn/MapValuesTest.java |    4 +-
 .../impl/mr/collect/DoCollectionImplTest.java      |    1 -
 .../crunch/io/CompositePathIterableTest.java       |    4 +-
 .../com/cloudera/crunch/lib/AggregateTest.java     |    3 +-
 .../com/cloudera/crunch/lib/CartesianTest.java     |    7 +-
 .../java/com/cloudera/crunch/lib/CogroupTest.java  |    1 -
 src/test/java/com/cloudera/crunch/lib/SetTest.java |   23 ++--
 .../java/com/cloudera/crunch/lib/SortTest.java     |   18 ++--
 .../com/cloudera/crunch/lib/join/JoinTester.java   |    3 +-
 .../crunch/lib/join/MultiAvroSchemaJoinTest.java   |    3 +-
 .../com/cloudera/crunch/types/PTypeUtilsTest.java  |    4 -
 .../com/cloudera/crunch/types/avro/AvrosTest.java  |    4 +-
 47 files changed, 264 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/CombineFn.java b/src/main/java/com/cloudera/crunch/CombineFn.java
index e450140..a9cd874 100644
--- a/src/main/java/com/cloudera/crunch/CombineFn.java
+++ b/src/main/java/com/cloudera/crunch/CombineFn.java
@@ -190,7 +190,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
 
     @Override
     public Iterable<TupleN> results() {
-      Iterable[] iterables = new Iterable[size];
+      Iterable<?>[] iterables = new Iterable[size];
       for (int i = 0; i < size; i++) {
         iterables[i] = results(i);
       }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
index 5ac7311..e491b01 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
@@ -60,8 +60,8 @@ public class MRPipeline implements Pipeline {
   
   private final Class<?> jarClass;
   private final String name;
-  private final Map<PCollectionImpl, Set<Target>> outputTargets;
-  private final Map<PCollectionImpl, MaterializableIterable> outputTargetsToMaterialize;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
   private final Path tempDirectory;
   private int tempFileIndex;
   private int nextAnonymousStageId;
@@ -111,7 +111,7 @@ public class MRPipeline implements Pipeline {
       LOG.error(e);
       return PipelineResult.EMPTY;
     }
-    for (PCollectionImpl c : outputTargets.keySet()) {
+    for (PCollectionImpl<?> c : outputTargets.keySet()) {
       if (outputTargetsToMaterialize.containsKey(c)) {
         MaterializableIterable iter = outputTargetsToMaterialize.get(c);
         iter.materialize();
@@ -156,15 +156,15 @@ public class MRPipeline implements Pipeline {
   @SuppressWarnings("unchecked")
   public void write(PCollection<?> pcollection, Target target) {
     if (pcollection instanceof PGroupedTableImpl) {
-      pcollection = ((PGroupedTableImpl) pcollection).ungroup();
+      pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
     } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
       pcollection = pcollection.parallelDo("UnionCollectionWrapper",  
     		  (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());	 
     }
-    addOutput((PCollectionImpl) pcollection, target);
+    addOutput((PCollectionImpl<?>) pcollection, target);
   }
 
-  private void addOutput(PCollectionImpl impl, Target target) {
+  private void addOutput(PCollectionImpl<?> impl, Target target) {
     if (!outputTargets.containsKey(impl)) {
       outputTargets.put(impl, Sets.<Target>newHashSet());
     }
@@ -178,7 +178,7 @@ public class MRPipeline implements Pipeline {
     	pcollection = pcollection.parallelDo("UnionCollectionWrapper",  
 	        (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());	 
 	}  
-    PCollectionImpl impl = (PCollectionImpl) pcollection;
+    PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
     SourceTarget<T> matTarget = impl.getMaterializedAt();
     if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
       return new MaterializableIterable<T>(this, (ReadableSourceTarget<T>) matTarget);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java b/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
index 0b9ccf9..f56093b 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
@@ -14,7 +14,6 @@
  */
 package com.cloudera.crunch.impl.mr.exec;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -32,17 +31,17 @@ import com.google.common.collect.Lists;
 public class MRExecutor {
 
   private static final Log LOG = LogFactory.getLog(MRExecutor.class);
-  
+
   private final CrunchJobControl control;
-  
+
   public MRExecutor(Class<?> jarClass) {
     this.control = new CrunchJobControl(jarClass.toString());
   }
-  
+
   public void addJob(CrunchJob job) {
     this.control.addJob(job);
   }
-  
+
   public PipelineResult execute() {
     try {
       Thread controlThread = new Thread(control);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
index 63dde98..e146a13 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
@@ -35,14 +35,14 @@ public class DoNode {
 
   private final DoFn fn;
   private final String name;
-  private final PType ptype;
+  private final PType<?> ptype;
   private final List<DoNode> children;
   private final Converter outputConverter;
-  private final Source source;
+  private final Source<?> source;
   private String outputName;
 
-  private DoNode(DoFn fn, String name, PType ptype, List<DoNode> children,
-      Converter outputConverter, Source source) {
+  private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children,
+      Converter outputConverter, Source<?> source) {
     this.fn = fn;
     this.name = name;
     this.ptype = ptype;
@@ -57,14 +57,14 @@ public class DoNode {
 
   public static <K, V> DoNode createGroupingNode(String name,
       PGroupedTableType<K, V> ptype) {
-    DoFn fn = ptype.getOutputMapFn();
+    DoFn<?,?> fn = ptype.getOutputMapFn();
     return new DoNode(fn, name, ptype, NO_CHILDREN,
         ptype.getGroupingConverter(), null);
   }
   
   public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
     Converter outputConverter = ptype.getConverter();
-    DoFn fn = ptype.getOutputMapFn();
+    DoFn<?,?> fn = ptype.getOutputMapFn();
     return new DoNode(fn, name, ptype, NO_CHILDREN,
         outputConverter, null);
   }
@@ -75,8 +75,8 @@ public class DoNode {
   }
 
   public static <S> DoNode createInputNode(Source<S> source) {
-    PType ptype = source.getType();
-    DoFn fn = ptype.getInputMapFn();
+    PType<?> ptype = source.getType();
+    DoFn<?,?> fn = ptype.getInputMapFn();
     return new DoNode(fn, source.toString(), ptype, allowsChildren(), null,
         source);
   }
@@ -97,11 +97,11 @@ public class DoNode {
     return children;
   }
   
-  public Source getSource() {
+  public Source<?> getSource() {
     return source;
   }
 
-  public PType getPType() {
+  public PType<?> getPType() {
     return ptype;
   }
 
@@ -132,7 +132,7 @@ public class DoNode {
       if (nodeContext == NodeContext.MAP) {
         inputConverter = ptype.getConverter();
       } else {
-        inputConverter = ((PGroupedTableType) ptype).getGroupingConverter();
+        inputConverter = ((PGroupedTableType<?,?>) ptype).getGroupingConverter();
       }
     }          
     return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter,

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
index a79e1dd..98fa6ef 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
@@ -46,7 +46,7 @@ import com.google.common.collect.Sets;
 
 public class JobPrototype {
 
-  public static JobPrototype createMapReduceJob(PGroupedTableImpl group,
+  public static JobPrototype createMapReduceJob(PGroupedTableImpl<?,?> group,
       Set<NodePath> inputs, Path workingPath) {
     return new JobPrototype(inputs, group, workingPath);
   }
@@ -57,17 +57,17 @@ public class JobPrototype {
   }
 
   private final Set<NodePath> mapNodePaths;
-  private final PGroupedTableImpl group;
+  private final PGroupedTableImpl<?,?> group;
   private final Set<JobPrototype> dependencies = Sets.newHashSet();
-  private final Map<PCollectionImpl, DoNode> nodes = Maps.newHashMap();
+  private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
   private final Path workingPath;
   
   private HashMultimap<Target, NodePath> targetsToNodePaths;
-  private DoTableImpl combineFnTable;
+  private DoTableImpl<?,?> combineFnTable;
 
   private CrunchJob job;
 
-  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl group,
+  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?,?> group,
       Path workingPath) {
     this.mapNodePaths = ImmutableSet.copyOf(inputs);
     this.group = group;
@@ -119,7 +119,7 @@ public class JobPrototype {
       DoNode node = null;
       for (NodePath nodePath : targetsToNodePaths.get(target)) {
         if (node == null) {
-          PCollectionImpl collect = nodePath.tail();
+          PCollectionImpl<?> collect = nodePath.tail();
           node = DoNode.createOutputNode(target.toString(), collect.getPType());
           outputHandler.configureNode(node, target);
         }
@@ -154,7 +154,7 @@ public class JobPrototype {
         // Advance these one step, since we've already configured
         // the grouping node, and the PGroupedTableImpl is the tail
         // of the NodePath.
-        Iterator<PCollectionImpl> iter = nodePath.descendingIterator();
+        Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
         iter.next();
         mapNodes.add(walkPath(iter, mapOutputNode));
       }
@@ -199,15 +199,15 @@ public class JobPrototype {
     return builder.build();
   }
   
-  private DoNode walkPath(Iterator<PCollectionImpl> iter, DoNode working) {
+  private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
     while (iter.hasNext()) {
-      PCollectionImpl collect = iter.next();
+      PCollectionImpl<?> collect = iter.next();
       if (combineFnTable != null &&
           !(collect instanceof PGroupedTableImpl)) {
         combineFnTable = null;
       } else if (collect instanceof DoTableImpl &&
-          ((DoTableImpl) collect).hasCombineFn()) {
-        combineFnTable = (DoTableImpl) collect;
+          ((DoTableImpl<?,?>) collect).hasCombineFn()) {
+        combineFnTable = (DoTableImpl<?,?>) collect;
       }
       if (!nodes.containsKey(collect)) {
         nodes.put(collect, collect.createDoNode());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
index ac6e7a3..35d8fec 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
@@ -45,9 +45,9 @@ public class MSCRPlanner {
 
   // Used to ensure that we always build pipelines starting from the deepest outputs, which
   // helps ensure that we handle intermediate outputs correctly.
-  private static final Comparator<PCollectionImpl> DEPTH_COMPARATOR = new Comparator<PCollectionImpl>() {
+  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
     @Override
-    public int compare(PCollectionImpl left, PCollectionImpl right) {
+    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
       int cmp = right.getDepth() - left.getDepth();   
       if (cmp == 0){
           // Ensure we don't throw away two output collections at the same depth.
@@ -59,12 +59,12 @@ public class MSCRPlanner {
   };
   
   private final MRPipeline pipeline;
-  private final Map<PCollectionImpl, Set<Target>> outputs;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
 
   public MSCRPlanner(MRPipeline pipeline,
-      Map<PCollectionImpl, Set<Target>> outputs) {
+      Map<PCollectionImpl<?>, Set<Target>> outputs) {
     this.pipeline = pipeline;
-    this.outputs = new TreeMap<PCollectionImpl, Set<Target>>(DEPTH_COMPARATOR);
+    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
     this.outputs.putAll(outputs);
   }
 
@@ -73,24 +73,24 @@ public class MSCRPlanner {
     // Constructs all of the node paths, which either start w/an input
     // or a GBK and terminate in an output collection of any type.
     NodeVisitor visitor = new NodeVisitor();
-    for (PCollectionImpl output : outputs.keySet()) {
+    for (PCollectionImpl<?> output : outputs.keySet()) {
       visitor.visitOutput(output);
     }
 
     // Pull out the node paths.
-    Map<PCollectionImpl, Set<NodePath>> nodePaths = visitor.getNodePaths();
+    Map<PCollectionImpl<?>, Set<NodePath>> nodePaths = visitor.getNodePaths();
 
     // Keeps track of the dependencies from collections -> jobs and then
     // between different jobs.
-    Map<PCollectionImpl, JobPrototype> assignments = Maps.newHashMap();
-    Map<PCollectionImpl, Set<JobPrototype>> jobDependencies =
-        new HashMap<PCollectionImpl, Set<JobPrototype>>();
+    Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap();
+    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies =
+        new HashMap<PCollectionImpl<?>, Set<JobPrototype>>();
 
     // Find the set of GBKs that DO NOT depend on any other GBK.
-    Set<PGroupedTableImpl> workingGroupings = null;
+    Set<PGroupedTableImpl<?,?>> workingGroupings = null;
     while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) {
 
-      for (PGroupedTableImpl grouping : workingGroupings) {
+      for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
         Set<NodePath> mapInputPaths = nodePaths.get(grouping);
         JobPrototype proto = JobPrototype.createMapReduceJob(grouping,
             mapInputPaths, pipeline.createTempPath());
@@ -102,16 +102,16 @@ public class MSCRPlanner {
         }
       }
 
-      Map<PGroupedTableImpl, Set<NodePath>> dependencyPaths = getDependencyPaths(
+      Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = getDependencyPaths(
           workingGroupings, nodePaths);
-      for (Map.Entry<PGroupedTableImpl, Set<NodePath>> entry : dependencyPaths.entrySet()) {
-        PGroupedTableImpl grouping = entry.getKey();
+      for (Map.Entry<PGroupedTableImpl<?,?>, Set<NodePath>> entry : dependencyPaths.entrySet()) {
+        PGroupedTableImpl<?,?> grouping = entry.getKey();
         Set<NodePath> currentNodePaths = entry.getValue();
 
         JobPrototype proto = assignments.get(grouping);
         Set<NodePath> gbkPaths = Sets.newHashSet();
         for (NodePath nodePath : currentNodePaths) {
-          PCollectionImpl tail = nodePath.tail();
+          PCollectionImpl<?> tail = nodePath.tail();
           if (tail instanceof PGroupedTableImpl) {
             gbkPaths.add(nodePath);
             if (!jobDependencies.containsKey(tail)) {
@@ -145,9 +145,9 @@ public class MSCRPlanner {
 
     // Process any map-only jobs that are remaining.
     if (!nodePaths.isEmpty()) {
-      for (Map.Entry<PCollectionImpl, Set<NodePath>> entry : nodePaths
+      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths
           .entrySet()) {
-        PCollectionImpl collect = entry.getKey();
+        PCollectionImpl<?> collect = entry.getKey();
         if (!assignments.containsKey(collect)) {
           HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create();
           for (NodePath nodePath : entry.getValue()) {
@@ -175,17 +175,17 @@ public class MSCRPlanner {
     return exec;
   }
 
-  private Map<PGroupedTableImpl, Set<NodePath>> getDependencyPaths(
-      Set<PGroupedTableImpl> workingGroupings,
-      Map<PCollectionImpl, Set<NodePath>> nodePaths) {
-    Map<PGroupedTableImpl, Set<NodePath>> dependencyPaths = Maps.newHashMap();
-    for (PGroupedTableImpl grouping : workingGroupings) {
+  private Map<PGroupedTableImpl<?,?>, Set<NodePath>> getDependencyPaths(
+      Set<PGroupedTableImpl<?,?>> workingGroupings,
+      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
+    Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
+    for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
       dependencyPaths.put(grouping, Sets.<NodePath> newHashSet());
     }
 
     // Find the targets that depend on one of the elements of the current
     // working group.
-    for (PCollectionImpl target : nodePaths.keySet()) {
+    for (PCollectionImpl<?> target : nodePaths.keySet()) {
       if (!workingGroupings.contains(target)) {
         for (NodePath nodePath : nodePaths.get(target)) {
           if (workingGroupings.contains(nodePath.head())) {
@@ -198,9 +198,9 @@ public class MSCRPlanner {
   }
 
   private int getSplitIndex(Set<NodePath> currentNodePaths) {
-    List<Iterator<PCollectionImpl>> iters = Lists.newArrayList();
+    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
     for (NodePath nodePath : currentNodePaths) {
-      Iterator<PCollectionImpl> iter = nodePath.iterator();
+      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
       iter.next(); // prime this past the initial NGroupedTableImpl
       iters.add(iter);
     }
@@ -211,10 +211,10 @@ public class MSCRPlanner {
     int splitIndex = -1;
     while (!end) {
       splitIndex++;
-      PCollectionImpl current = null;
-      for (Iterator<PCollectionImpl> iter : iters) {
+      PCollectionImpl<?> current = null;
+      for (Iterator<PCollectionImpl<?>> iter : iters) {
         if (iter.hasNext()) {
-          PCollectionImpl next = iter.next();
+          PCollectionImpl<?> next = iter.next();
           if (next instanceof PGroupedTableImpl) {
             end = true;
             break;
@@ -237,7 +237,7 @@ public class MSCRPlanner {
   private void handleGroupingDependencies(Set<NodePath> gbkPaths,
       Set<NodePath> currentNodePaths) throws IOException {
     int splitIndex = getSplitIndex(currentNodePaths);
-    PCollectionImpl splitTarget = currentNodePaths.iterator().next()
+    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next()
         .get(splitIndex);
     if (!outputs.containsKey(splitTarget)) {
       outputs.put(splitTarget, Sets.<Target>newHashSet());
@@ -247,7 +247,7 @@ public class MSCRPlanner {
     Target targetToReplace = null;
     for (Target t : outputs.get(splitTarget)) {
       if (t instanceof SourceTarget) {
-        srcTarget = (SourceTarget) t;
+        srcTarget = (SourceTarget<?>) t;
         break;
       } else {
         srcTarget = t.asSourceTarget(splitTarget.getPType());
@@ -265,7 +265,7 @@ public class MSCRPlanner {
     outputs.get(splitTarget).add(srcTarget);
     splitTarget.materializeAt(srcTarget);
 
-    PCollectionImpl inputNode = (PCollectionImpl) pipeline.read(srcTarget);
+    PCollectionImpl<?> inputNode = (PCollectionImpl<?>) pipeline.read(srcTarget);
     Set<NodePath> nextNodePaths = Sets.newHashSet();
     for (NodePath nodePath : currentNodePaths) {
       if (gbkPaths.contains(nodePath)) {
@@ -278,10 +278,10 @@ public class MSCRPlanner {
     currentNodePaths.addAll(nextNodePaths);
   }
 
-  private Set<PGroupedTableImpl> getWorkingGroupings(
-      Map<PCollectionImpl, Set<NodePath>> nodePaths) {
-    Set<PGroupedTableImpl> gbks = Sets.newHashSet();
-    for (PCollectionImpl target : nodePaths.keySet()) {
+  private Set<PGroupedTableImpl<?,?>> getWorkingGroupings(
+      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
+    Set<PGroupedTableImpl<?,?>> gbks = Sets.newHashSet();
+    for (PCollectionImpl<?> target : nodePaths.keySet()) {
       if (target instanceof PGroupedTableImpl) {
         boolean hasGBKDependency = false;
         for (NodePath nodePath : nodePaths.get(target)) {
@@ -291,7 +291,7 @@ public class MSCRPlanner {
           }
         }
         if (!hasGBKDependency) {
-          gbks.add((PGroupedTableImpl) target);
+          gbks.add((PGroupedTableImpl<?,?>) target);
         }
       }
     }
@@ -300,21 +300,21 @@ public class MSCRPlanner {
 
   private static class NodeVisitor implements PCollectionImpl.Visitor {
 
-    private final Map<PCollectionImpl, Set<NodePath>> nodePaths;
-    private final Map<PCollectionImpl, Source> inputs;
-    private PCollectionImpl workingNode;
+    private final Map<PCollectionImpl<?>, Set<NodePath>> nodePaths;
+    private final Map<PCollectionImpl<?>, Source<?>> inputs;
+    private PCollectionImpl<?> workingNode;
     private NodePath workingPath;
 
     public NodeVisitor() {
-      this.nodePaths = new HashMap<PCollectionImpl, Set<NodePath>>();
-      this.inputs = new HashMap<PCollectionImpl, Source>();
+      this.nodePaths = new HashMap<PCollectionImpl<?>, Set<NodePath>>();
+      this.inputs = new HashMap<PCollectionImpl<?>, Source<?>>();
     }
 
-    public Map<PCollectionImpl, Set<NodePath>> getNodePaths() {
+    public Map<PCollectionImpl<?>, Set<NodePath>> getNodePaths() {
       return nodePaths;
     }
 
-    public void visitOutput(PCollectionImpl output) {
+    public void visitOutput(PCollectionImpl<?> output) {
       nodePaths.put(output, Sets.<NodePath> newHashSet());
       workingNode = output;
       workingPath = new NodePath();
@@ -330,9 +330,9 @@ public class MSCRPlanner {
 
     @Override
     public void visitUnionCollection(UnionCollection<?> collection) {
-      PCollectionImpl baseNode = workingNode;
+      PCollectionImpl<?> baseNode = workingNode;
       NodePath basePath = workingPath;
-      for (PCollectionImpl parent : collection.getParents()) {
+      for (PCollectionImpl<?> parent : collection.getParents()) {
         workingPath = new NodePath(basePath);
         workingNode = baseNode;
         processParent(parent);
@@ -361,7 +361,7 @@ public class MSCRPlanner {
       processParent(collection.getOnlyParent());
     }
 
-    private void processParent(PCollectionImpl parent) {
+    private void processParent(PCollectionImpl<?> parent) {
       if (!nodePaths.containsKey(parent)) {
         parent.accept(this);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
index 24d5fc2..9a3cecb 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
@@ -20,14 +20,14 @@ import java.util.LinkedList;
 import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
 import com.google.common.collect.Lists;
 
-class NodePath implements Iterable<PCollectionImpl> {
-  private LinkedList<PCollectionImpl> path;
+class NodePath implements Iterable<PCollectionImpl<?>> {
+  private LinkedList<PCollectionImpl<?>> path;
 
   public NodePath() {
     this.path = Lists.newLinkedList();
   }
 
-  public NodePath(PCollectionImpl tail) {
+  public NodePath(PCollectionImpl<?> tail) {
     this.path = Lists.newLinkedList();
     this.path.add(tail);
   }
@@ -36,31 +36,31 @@ class NodePath implements Iterable<PCollectionImpl> {
     this.path = Lists.newLinkedList(other.path);
   }
 
-  public void push(PCollectionImpl stage) {
-    this.path.push((PCollectionImpl) stage);
+  public void push(PCollectionImpl<?> stage) {
+    this.path.push((PCollectionImpl<?>) stage);
   }
 
-  public void close(PCollectionImpl head) {
+  public void close(PCollectionImpl<?> head) {
     this.path.push(head);
   }
 
-  public Iterator<PCollectionImpl> iterator() {
+  public Iterator<PCollectionImpl<?>> iterator() {
     return path.iterator();
   }
 
-  public Iterator<PCollectionImpl> descendingIterator() {
+  public Iterator<PCollectionImpl<?>> descendingIterator() {
     return path.descendingIterator();
   }
 
-  public PCollectionImpl get(int index) {
+  public PCollectionImpl<?> get(int index) {
     return path.get(index);
   }
 
-  public PCollectionImpl head() {
+  public PCollectionImpl<?> head() {
     return path.peekFirst();
   }
 
-  public PCollectionImpl tail() {
+  public PCollectionImpl<?> tail() {
     return path.peekLast();
   }
 
@@ -81,19 +81,19 @@ class NodePath implements Iterable<PCollectionImpl> {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    for (PCollectionImpl collect : path) {
+    for (PCollectionImpl<?> collect : path) {
       sb.append(collect.getName() + "|");
     }
     sb.deleteCharAt(sb.length() - 1);
     return sb.toString();
   }
   
-  public NodePath splitAt(int splitIndex, PCollectionImpl newHead) {
+  public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
     NodePath top = new NodePath();
     for (int i = 0; i <= splitIndex; i++) {
       top.path.add(path.get(i));
     }
-    LinkedList<PCollectionImpl> nextPath = Lists.newLinkedList();
+    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
     nextPath.add(newHead);
     nextPath.addAll(path.subList(splitIndex + 1, path.size()));
     path = nextPath;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
index 48f3626..2289462 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
@@ -49,7 +49,7 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
     for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
       InputBundle inputBundle = entry.getKey();
       Job jobCopy = new Job(conf);
-      InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+      InputFormat<?,?> format = (InputFormat<?,?>) ReflectionUtils.newInstance(
           inputBundle.getInputFormatClass(), jobCopy.getConfiguration());
       for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue()
           .entrySet()) {
@@ -72,7 +72,7 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
   @Override
   public RecordReader<K, V> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CrunchRecordReader(inputSplit, context);
+    return new CrunchRecordReader<K,V>(inputSplit, context);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
index 36791b9..5e1da12 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.collect.Maps;
-
 public class CrunchInputSplit extends InputSplit implements Configurable, Writable {
 
   private InputSplit inputSplit;
@@ -90,7 +88,7 @@ public class CrunchInputSplit extends InputSplit implements Configurable, Writab
         conf.set(in.readUTF(), in.readUTF());
       }
     }
-    inputFormatClass = (Class<? extends InputFormat>) readClass(in);
+    inputFormatClass = (Class<? extends InputFormat<?,?>>) readClass(in);
     Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
     inputSplit = (InputSplit) ReflectionUtils
         .newInstance(inputSplitClass, conf);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
index 16d792d..2af8e8d 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
@@ -23,14 +23,12 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 
 import com.cloudera.crunch.io.impl.InputBundle;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
index a7ab38c..0924268 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
@@ -49,6 +49,7 @@ public class CrunchTaskContext {
   public List<RTNode> getNodes() throws IOException {
     Configuration conf = taskContext.getConfiguration();
     Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
+    @SuppressWarnings("unchecked")
     List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
     if (nodes != null) {
       for (RTNode node : nodes) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java b/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
index 9419265..29bc9f9 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/run/RTNode.java
@@ -38,7 +38,7 @@ public class RTNode implements Serializable {
   private final Converter outputConverter;
   private final String outputName;
 
-  private transient Emitter emitter;
+  private transient Emitter<Object> emitter;
 
   public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children,
       Converter inputConverter, Converter outputConverter, String outputName) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java b/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
index 507cec2..774bbac 100644
--- a/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
+++ b/src/main/java/com/cloudera/crunch/io/CompositePathIterable.java
@@ -18,7 +18,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.NoSuchElementException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
index e11283f..a876843 100644
--- a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
+++ b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
@@ -37,7 +37,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements
 	return strPath.endsWith(".bz") || strPath.endsWith(".bz2");
   }
   
-  private static <S> Class<? extends FileInputFormat> getInputFormat(Path path, PType<S> ptype) {
+  private static <S> Class<? extends FileInputFormat<?,?>> getInputFormat(Path path, PType<S> ptype) {
 	if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
 	  return AvroUtf8InputFormat.class;
 	} else if (isBZip2(path)){

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Cartesian.java b/src/main/java/com/cloudera/crunch/lib/Cartesian.java
index 7d6eefa..79d1717 100644
--- a/src/main/java/com/cloudera/crunch/lib/Cartesian.java
+++ b/src/main/java/com/cloudera/crunch/lib/Cartesian.java
@@ -19,11 +19,11 @@ import java.util.Random;
 
 import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.PCollection;
 import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.types.PTypeFamily;
+import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.types.PTableType;
+import com.cloudera.crunch.types.PTypeFamily;
 
 /**
  * Utilities for Cartesian products of two {@code PTable} or {@code PCollection} instances.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/test/TestCounters.java b/src/main/java/com/cloudera/crunch/test/TestCounters.java
index d3a3d49..0b2e33d 100644
--- a/src/main/java/com/cloudera/crunch/test/TestCounters.java
+++ b/src/main/java/com/cloudera/crunch/test/TestCounters.java
@@ -14,8 +14,8 @@
  */
 package com.cloudera.crunch.test;
 
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
 
 /**
  * A utility class used during unit testing to update and read counters.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
index bede0ce..9a93ce3 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
@@ -14,7 +14,6 @@
  */
 package com.cloudera.crunch.types.avro;
 
-import com.cloudera.crunch.types.PTableType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
index dc3c295..e9696d8 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
@@ -16,9 +16,6 @@ package com.cloudera.crunch.types.avro;
 
 import java.util.List;
 
-import com.cloudera.crunch.types.Converter;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
 import org.apache.avro.Schema;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/Avros.java b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
index e5573e8..bb78e31 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/Avros.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
@@ -237,12 +237,12 @@ public class Avros {
         new WritableToBytesMapFn<T>());
   }
   
-  private static class GenericDataArrayToCollection extends MapFn<Object, Collection> {
+  private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
     private static final long serialVersionUID = 1L;
 
-    private final MapFn mapFn;
+    private final MapFn<Object,T> mapFn;
     
-    public GenericDataArrayToCollection(MapFn mapFn) {
+    public GenericDataArrayToCollection(MapFn<Object,T> mapFn) {
       this.mapFn = mapFn;
     }
     
@@ -262,24 +262,24 @@ public class Avros {
     }
     
     @Override
-    public Collection map(Object input) {
-      Collection ret = Lists.newArrayList();
+    public Collection<T> map(Object input) {
+      Collection<T> ret = Lists.newArrayList();
       if (input instanceof Collection) {
-    	for (Object in : (Collection) input) {
-    	  ret.add(mapFn.map(in));
-    	}
+        for (Object in : (Collection<Object>) input) {
+          ret.add(mapFn.map(in));
+        }
       } else {
-    	// Assume it is an array
-    	Object[] arr = (Object[]) input;
-    	for (Object in : arr) {
-    	  ret.add(mapFn.map(in));
-    	}
+        // Assume it is an array
+        Object[] arr = (Object[]) input;
+        for (Object in : arr) {
+          ret.add(mapFn.map(in));
+        }
       }
       return ret;
     }
   }
   
-  private static class CollectionToGenericDataArray extends MapFn<Collection, GenericData.Array> {
+  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
     private static final long serialVersionUID = 1L;
     
     private final MapFn mapFn;
@@ -307,7 +307,7 @@ public class Avros {
     }
     
     @Override
-    public GenericData.Array map(Collection input) {
+    public GenericData.Array<?> map(Collection<?> input) {
       if(schema == null) {
         schema = new Schema.Parser().parse(jsonSchema);
       }
@@ -322,7 +322,7 @@ public class Avros {
   public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
     AvroType<T> avroType = (AvroType<T>) ptype;
     Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
-    GenericDataArrayToCollection input = new GenericDataArrayToCollection(avroType.getInputMapFn());
+    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
     CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
     return new AvroType(Collection.class, collectionSchema, input, output, ptype);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
index 3b0a2e5..376af48 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
@@ -16,7 +16,6 @@ package com.cloudera.crunch.types.writable;
 
 import java.util.List;
 
-import com.cloudera.crunch.types.PTypeFamily;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -30,6 +29,7 @@ import com.cloudera.crunch.types.Converter;
 import com.cloudera.crunch.types.PGroupedTableType;
 import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PType;
+import com.cloudera.crunch.types.PTypeFamily;
 import com.google.common.collect.ImmutableList;
 
 class WritableTableType<K, V> implements PTableType<K, V> {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
index f340798..8031e90 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
@@ -16,9 +16,6 @@ package com.cloudera.crunch.types.writable;
 
 import java.util.List;
 
-import com.cloudera.crunch.types.Converter;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.Path;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableTypeFamily.java b/src/main/java/com/cloudera/crunch/types/writable/WritableTypeFamily.java
index 1b682b0..ad2503e 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableTypeFamily.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableTypeFamily.java
@@ -18,6 +18,8 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 
+import org.apache.hadoop.io.Writable;
+
 import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.Tuple;
@@ -29,7 +31,6 @@ import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PType;
 import com.cloudera.crunch.types.PTypeFamily;
 import com.cloudera.crunch.types.PTypeUtils;
-import org.apache.hadoop.io.Writable;
 
 /**
  * The {@link Writable}-based implementation of the {@link com.cloudera.crunch.types.PTypeFamily}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableValueConverter.java b/src/main/java/com/cloudera/crunch/types/writable/WritableValueConverter.java
index 6f49b88..d516d36 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableValueConverter.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableValueConverter.java
@@ -14,7 +14,6 @@
  */
 package com.cloudera.crunch.types.writable;
 
-import com.cloudera.crunch.types.Converter;
 import org.apache.hadoop.io.NullWritable;
 
 import com.cloudera.crunch.types.Converter;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/com/cloudera/crunch/util/Protos.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/util/Protos.java b/src/main/java/com/cloudera/crunch/util/Protos.java
index 5c4cbb3..571a2d3 100644
--- a/src/main/java/com/cloudera/crunch/util/Protos.java
+++ b/src/main/java/com/cloudera/crunch/util/Protos.java
@@ -23,8 +23,8 @@ import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.MapFn;
 import com.google.common.base.Splitter;
-import com.google.protobuf.Message;
 import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
index bf6b7cf..ae2b253 100644
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
+++ b/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
@@ -249,7 +249,6 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
    * @param keyClass          key class
    * @param valueClass        value class
    */
-  @SuppressWarnings("unchecked")
   public static void addNamedOutput(Job job, String namedOutput,
       Class<? extends OutputFormat> outputFormatClass,
       Class<?> keyClass, Class<?> valueClass) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/CollectionsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/CollectionsTest.java b/src/test/java/com/cloudera/crunch/CollectionsTest.java
index bdad177..1ba99b9 100644
--- a/src/test/java/com/cloudera/crunch/CollectionsTest.java
+++ b/src/test/java/com/cloudera/crunch/CollectionsTest.java
@@ -16,11 +16,11 @@ package com.cloudera.crunch;
 
 import static org.junit.Assert.assertTrue;
 
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pipeline;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.junit.Test;
+
 import com.cloudera.crunch.impl.mem.MemPipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.test.FileHelper;
@@ -30,11 +30,6 @@ import com.cloudera.crunch.types.writable.WritableTypeFamily;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-import java.io.IOException;
-import java.util.Collection;
-
-import org.junit.Test;
-
 @SuppressWarnings("serial")
 public class CollectionsTest {
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/CombineFnTest.java b/src/test/java/com/cloudera/crunch/CombineFnTest.java
index 9b7c639..66d0635 100644
--- a/src/test/java/com/cloudera/crunch/CombineFnTest.java
+++ b/src/test/java/com/cloudera/crunch/CombineFnTest.java
@@ -14,6 +14,21 @@
  */
 package com.cloudera.crunch;
 
+import static com.cloudera.crunch.CombineFn.MAX_BIGINTS;
+import static com.cloudera.crunch.CombineFn.MAX_DOUBLES;
+import static com.cloudera.crunch.CombineFn.MAX_FLOATS;
+import static com.cloudera.crunch.CombineFn.MAX_INTS;
+import static com.cloudera.crunch.CombineFn.MAX_LONGS;
+import static com.cloudera.crunch.CombineFn.MIN_BIGINTS;
+import static com.cloudera.crunch.CombineFn.MIN_DOUBLES;
+import static com.cloudera.crunch.CombineFn.MIN_FLOATS;
+import static com.cloudera.crunch.CombineFn.MIN_INTS;
+import static com.cloudera.crunch.CombineFn.MIN_LONGS;
+import static com.cloudera.crunch.CombineFn.SUM_BIGINTS;
+import static com.cloudera.crunch.CombineFn.SUM_DOUBLES;
+import static com.cloudera.crunch.CombineFn.SUM_FLOATS;
+import static com.cloudera.crunch.CombineFn.SUM_INTS;
+import static com.cloudera.crunch.CombineFn.SUM_LONGS;
 import static org.junit.Assert.assertEquals;
 
 import java.math.BigInteger;
@@ -21,8 +36,16 @@ import java.util.List;
 
 import org.junit.Test;
 
-import static com.cloudera.crunch.CombineFn.*;
-
+import com.cloudera.crunch.CombineFn.Aggregator;
+import com.cloudera.crunch.CombineFn.AggregatorFactory;
+import com.cloudera.crunch.CombineFn.FirstNAggregator;
+import com.cloudera.crunch.CombineFn.LastNAggregator;
+import com.cloudera.crunch.CombineFn.MaxNAggregator;
+import com.cloudera.crunch.CombineFn.MinNAggregator;
+import com.cloudera.crunch.CombineFn.PairAggregator;
+import com.cloudera.crunch.CombineFn.QuadAggregator;
+import com.cloudera.crunch.CombineFn.TripAggregator;
+import com.cloudera.crunch.CombineFn.TupleNAggregator;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/FilterFnTest.java b/src/test/java/com/cloudera/crunch/FilterFnTest.java
index d77eed7..f9a9479 100644
--- a/src/test/java/com/cloudera/crunch/FilterFnTest.java
+++ b/src/test/java/com/cloudera/crunch/FilterFnTest.java
@@ -14,8 +14,8 @@
  */
 package com.cloudera.crunch;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java b/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
index bb060af..9550e87 100644
--- a/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
+++ b/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
@@ -14,11 +14,12 @@
  */
 package com.cloudera.crunch;
 
+import static junit.framework.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.Map;
 
 import org.junit.Test;
-import static junit.framework.Assert.assertTrue;
 
 import com.cloudera.crunch.impl.mem.MemPipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/MultipleOutputTest.java b/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
index ab34099..a12f724 100644
--- a/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
+++ b/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
@@ -14,26 +14,8 @@
  */
 package com.cloudera.crunch;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.To;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -42,6 +24,14 @@ import java.util.List;
 
 import org.junit.Test;
 
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.io.At;
+import com.cloudera.crunch.test.FileHelper;
+import com.cloudera.crunch.types.PTypeFamily;
+import com.cloudera.crunch.types.avro.AvroTypeFamily;
+import com.cloudera.crunch.types.writable.WritableTypeFamily;
+import com.google.common.io.Files;
+
 public class MultipleOutputTest {
   
   public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java b/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
index ae7ed1b..7069267 100644
--- a/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
+++ b/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
@@ -13,9 +13,6 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.cloudera.crunch.FilterFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pipeline;
 import com.cloudera.crunch.impl.mem.MemPipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.test.FileHelper;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java b/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
index 2e2fe2d..b1961b1 100644
--- a/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
+++ b/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
@@ -17,7 +17,6 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.lib.SetTest;
 import com.cloudera.crunch.test.FileHelper;
 import com.cloudera.crunch.types.PTypeFamily;
 import com.cloudera.crunch.types.avro.AvroTypeFamily;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/PageRankTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PageRankTest.java b/src/test/java/com/cloudera/crunch/PageRankTest.java
index 64569aa..66ae485 100644
--- a/src/test/java/com/cloudera/crunch/PageRankTest.java
+++ b/src/test/java/com/cloudera/crunch/PageRankTest.java
@@ -16,6 +16,11 @@ package com.cloudera.crunch;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+
 import com.cloudera.crunch.impl.mem.MemPipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.lib.Aggregate;
@@ -29,11 +34,6 @@ import com.cloudera.crunch.util.PTypes;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-
 public class PageRankTest {
 
   public static class PageRankData {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java b/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
index 37dbcb7..22d044e 100644
--- a/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
+++ b/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
@@ -14,28 +14,28 @@
  */
 package com.cloudera.crunch;
 
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.io.Files;
-import org.junit.Test;
+import static com.google.common.io.Resources.getResource;
+import static com.google.common.io.Resources.newInputStreamSupplier;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
 
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.types.PTypeFamily;
+import com.cloudera.crunch.types.avro.AvroTypeFamily;
+import com.cloudera.crunch.types.writable.WritableTypeFamily;
+import com.google.common.io.Files;
 
 public class TupleNClassCastBugTest {
 
   public static PCollection<TupleN> mapGroupDo(PCollection<String> lines, PTypeFamily ptf) {
     PTable<String, TupleN> mapped = lines.parallelDo(new MapFn<String, Pair<String, TupleN>>() {
+
       @Override
       public Pair<String, TupleN> map(String line) {
         String[] columns = line.split("\\t");

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/WordCountTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/WordCountTest.java b/src/test/java/com/cloudera/crunch/WordCountTest.java
index a18c74d..a0c4abb 100644
--- a/src/test/java/com/cloudera/crunch/WordCountTest.java
+++ b/src/test/java/com/cloudera/crunch/WordCountTest.java
@@ -14,14 +14,16 @@
  */
 package com.cloudera.crunch;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.junit.Test;
 
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.io.At;
 import com.cloudera.crunch.io.To;
@@ -34,19 +36,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
+public class WordCountTest {
 
-import org.junit.Test;
+  enum WordCountStats {
+    ANDS
+  };
 
-public class WordCountTest {
-  
-  enum WordCountStats { ANDS };
-  
   public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
     return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
+
       @Override
       public void process(String line, Emitter<String> emitter) {
         for (String word : line.split("\\s+")) {
@@ -58,21 +56,21 @@ public class WordCountTest {
       }
     }, typeFamily.strings()));
   }
-  
+
   public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-	return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-	  public void process(Pair<String, Long> input,
-		  Emitter<Pair<String, Long>> emitter) {
-		if (input.first().length() > 0) {
-		  emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-		}
-	  }      
+    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
+
+      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
+        if (input.first().length() > 0) {
+          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
+        }
+      }
     }, ptable.getPTableType());
   }
-  
+
   private boolean runSecond = false;
   private boolean useToOutput = false;
-  
+
   @Test
   public void testWritables() throws IOException {
     run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
@@ -81,7 +79,7 @@ public class WordCountTest {
   @Test
   public void testWritablesWithSecond() throws IOException {
     runSecond = true;
-	run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
+    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
   }
 
   @Test
@@ -95,57 +93,55 @@ public class WordCountTest {
   public void testAvro() throws IOException {
     run(new MRPipeline(WordCountTest.class), AvroTypeFamily.getInstance());
   }
-  
+
   @Test
   public void testAvroWithSecond() throws IOException {
     runSecond = true;
     run(new MRPipeline(WordCountTest.class), AvroTypeFamily.getInstance());
   }
-  
+
   @Test
   public void testWithTopWritable() throws IOException {
     runWithTop(WritableTypeFamily.getInstance());
   }
-  
+
   @Test
   public void testWithTopAvro() throws IOException {
-    runWithTop(AvroTypeFamily.getInstance()); 
+    runWithTop(AvroTypeFamily.getInstance());
   }
-  
+
   public static void runWithTop(PTypeFamily tf) throws IOException {
     Pipeline pipeline = new MRPipeline(WordCountTest.class);
     String inputPath = FileHelper.createTempCopyOf("shakes.txt");
-    
-    PCollection<String> shakespeare = pipeline.read(
-         At.textFile(inputPath, tf.strings()));
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
     PTable<String, Long> wordCount = wordCount(shakespeare, tf);
-    List<Pair<String, Long>> top5 = Lists.newArrayList(
-        Aggregate.top(wordCount, 5, true).materialize());
-    assertEquals(ImmutableList.of(Pair.of("", 1470L),
-        Pair.of("the", 620L), Pair.of("and", 427L), Pair.of("of", 396L), 
-        Pair.of("to", 367L)), top5);
+    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true)
+        .materialize());
+    assertEquals(
+        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L),
+            Pair.of("of", 396L), Pair.of("to", 367L)), top5);
   }
-  
+
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String inputPath = FileHelper.createTempCopyOf("shakes.txt");
-	File output = FileHelper.createOutputPath();
-	String outputPath = output.getAbsolutePath();
-	
-    PCollection<String> shakespeare = pipeline.read(
-         At.textFile(inputPath, typeFamily.strings()));
+    String inputPath = FileHelper.createTempCopyOf("shakes.txt");
+    File output = FileHelper.createOutputPath();
+    String outputPath = output.getAbsolutePath();
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
     PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
     if (useToOutput) {
       wordCount.write(To.textFile(outputPath));
     } else {
       pipeline.writeTextFile(wordCount, outputPath);
     }
-    
+
     if (runSecond) {
       File substrCount = File.createTempFile("substr", "");
       String substrPath = substrCount.getAbsolutePath();
       substrCount.delete();
       PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(
-          CombineFn.<String>SUM_LONGS());
+          CombineFn.<String> SUM_LONGS());
       pipeline.writeTextFile(we, substrPath);
     }
     PipelineResult res = pipeline.done();
@@ -157,7 +153,7 @@ public class WordCountTest {
       assertEquals(1, stageResults.size());
       assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
     }
-    
+
     File outputFile = new File(outputPath, "part-r-00000");
     List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
     boolean passed = false;
@@ -168,6 +164,6 @@ public class WordCountTest {
       }
     }
     assertTrue(passed);
-	output.deleteOnExit();
-  }  
+    output.deleteOnExit();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java b/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
index a3aa3e2..696e965 100644
--- a/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
+++ b/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
@@ -15,9 +15,11 @@
 package com.cloudera.crunch.fn;
 
 import static org.junit.Assert.assertEquals;
-import com.cloudera.crunch.Pair;
+
 import org.junit.Test;
 
+import com.cloudera.crunch.Pair;
+
 @SuppressWarnings("serial")
 public class MapValuesTest {
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java b/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
index 511bc91..209810e 100644
--- a/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
+++ b/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -8,7 +8,6 @@ import org.junit.Test;
 
 import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.fn.IdentityFn;
 import com.cloudera.crunch.impl.mr.plan.DoNode;
 import com.cloudera.crunch.types.PType;
 import com.cloudera.crunch.types.writable.Writables;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java b/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
index c20cca8..ad63983 100644
--- a/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
+++ b/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
@@ -1,6 +1,8 @@
 package com.cloudera.crunch.io;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
index f244921..18b9a68 100644
--- a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
@@ -14,7 +14,8 @@
  */
 package com.cloudera.crunch.lib;
 
-import static com.cloudera.crunch.types.writable.Writables.*;
+import static com.cloudera.crunch.types.writable.Writables.strings;
+import static com.cloudera.crunch.types.writable.Writables.tableOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/CartesianTest.java b/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
index 2ed8e07..b2d9b94 100644
--- a/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
@@ -3,16 +3,15 @@ package com.cloudera.crunch.lib;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Test;
-
 import java.util.HashSet;
 import java.util.Iterator;
 
+import org.junit.Test;
+
 import com.cloudera.crunch.PCollection;
 import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.lib.Cartesian;
-import com.cloudera.crunch.types.writable.Writables;
 import com.cloudera.crunch.impl.mem.MemPipeline;
+import com.cloudera.crunch.types.writable.Writables;
 import com.google.common.collect.ImmutableList;
 
 public class CartesianTest {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/CogroupTest.java b/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
index 5fec26c..648d11f 100644
--- a/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
@@ -35,7 +35,6 @@ import com.cloudera.crunch.fn.MapKeysFn;
 import com.cloudera.crunch.fn.MapValuesFn;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.io.From;
-import com.cloudera.crunch.lib.Cogroup;
 import com.cloudera.crunch.test.FileHelper;
 import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PTypeFamily;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/SetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/SetTest.java b/src/test/java/com/cloudera/crunch/lib/SetTest.java
index 2122be7..3cadc0e 100644
--- a/src/test/java/com/cloudera/crunch/lib/SetTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/SetTest.java
@@ -17,18 +17,6 @@ package com.cloudera.crunch.lib;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.Tuple3;
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -41,6 +29,17 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.Tuple3;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.io.At;
+import com.cloudera.crunch.test.FileHelper;
+import com.cloudera.crunch.types.PTypeFamily;
+import com.cloudera.crunch.types.avro.AvroTypeFamily;
+import com.cloudera.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
 @RunWith(value = Parameterized.class)
 public class SetTest {
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/SortTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/SortTest.java b/src/test/java/com/cloudera/crunch/lib/SortTest.java
index 28505b9..e65a01a 100644
--- a/src/test/java/com/cloudera/crunch/lib/SortTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/SortTest.java
@@ -15,9 +15,17 @@
 package com.cloudera.crunch.lib;
 
 import static com.cloudera.crunch.lib.Sort.ColumnOrder.by;
-import static com.cloudera.crunch.lib.Sort.Order.*;
+import static com.cloudera.crunch.lib.Sort.Order.ASCENDING;
+import static com.cloudera.crunch.lib.Sort.Order.DESCENDING;
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
 import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.PCollection;
@@ -27,7 +35,6 @@ import com.cloudera.crunch.Pipeline;
 import com.cloudera.crunch.Tuple3;
 import com.cloudera.crunch.Tuple4;
 import com.cloudera.crunch.TupleN;
-import com.cloudera.crunch.impl.mem.MemPipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
 import com.cloudera.crunch.lib.Sort.ColumnOrder;
 import com.cloudera.crunch.lib.Sort.Order;
@@ -37,13 +44,6 @@ import com.cloudera.crunch.types.PTypeFamily;
 import com.cloudera.crunch.types.avro.AvroTypeFamily;
 import com.cloudera.crunch.types.writable.WritableTypeFamily;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
 public class SortTest implements Serializable {
   
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
index 95dfc55..651747d 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
@@ -17,6 +17,8 @@ package com.cloudera.crunch.lib.join;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.junit.Test;
+
 import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.PCollection;
@@ -31,7 +33,6 @@ import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PTypeFamily;
 import com.cloudera.crunch.types.avro.AvroTypeFamily;
 import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import org.junit.Test;
 
 public abstract class JoinTester implements Serializable {
   private static class WordSplit extends DoFn<String, String> {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/lib/join/MultiAvroSchemaJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/MultiAvroSchemaJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/MultiAvroSchemaJoinTest.java
index 04c9464..be17bf6 100644
--- a/src/test/java/com/cloudera/crunch/lib/join/MultiAvroSchemaJoinTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/join/MultiAvroSchemaJoinTest.java
@@ -14,7 +14,8 @@
  */
 package com.cloudera.crunch.lib.join;
 
-import static com.cloudera.crunch.types.avro.Avros.*;
+import static com.cloudera.crunch.types.avro.Avros.records;
+import static com.cloudera.crunch.types.avro.Avros.strings;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/types/PTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/PTypeUtilsTest.java b/src/test/java/com/cloudera/crunch/types/PTypeUtilsTest.java
index 92cb8f4..dcc9ef6 100644
--- a/src/test/java/com/cloudera/crunch/types/PTypeUtilsTest.java
+++ b/src/test/java/com/cloudera/crunch/types/PTypeUtilsTest.java
@@ -19,10 +19,6 @@ import static org.junit.Assert.assertNotNull;
 
 import java.util.Collection;
 
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.avro.Avros;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.cloudera.crunch.types.writable.Writables;
 import org.apache.avro.Schema;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.io.Text;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f1210d88/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
index fae0e13..74e2ad3 100644
--- a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
@@ -20,8 +20,6 @@ import static org.junit.Assert.assertNotNull;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
@@ -33,8 +31,8 @@ import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.Tuple3;
 import com.cloudera.crunch.Tuple4;
 import com.cloudera.crunch.TupleN;
+import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.writable.Writables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 


Mime
View raw message