incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/4] git commit: Cleanup a bunch of compiler warnings, mostly by adding serialVersionUIDs to DoFn and friends
Date Tue, 19 Jun 2012 03:09:19 GMT
Updated Branches:
  refs/heads/master eb3de8851 -> 2d25d5dfd


Cleanup a bunch of compiler warnings, mostly by adding serialVersionUIDs to DoFn and friends


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2d25d5df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2d25d5df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2d25d5df

Branch: refs/heads/master
Commit: 2d25d5dfd8f4b78681162c2dfaf4a7342c7678e1
Parents: 4b73249
Author: Josh Wills <jwills@cloudera.com>
Authored: Mon Jun 18 19:44:23 2012 -0700
Committer: Josh Wills <jwills@cloudera.com>
Committed: Mon Jun 18 19:44:23 2012 -0700

----------------------------------------------------------------------
 src/main/java/com/cloudera/crunch/CombineFn.java   |   55 +-
 src/main/java/com/cloudera/crunch/DoFn.java        |    3 +-
 src/main/java/com/cloudera/crunch/FilterFn.java    |    7 +
 src/main/java/com/cloudera/crunch/MapFn.java       |    1 +
 src/main/java/com/cloudera/crunch/Tuple3.java      |    2 +-
 src/main/java/com/cloudera/crunch/Tuple4.java      |    2 +-
 .../com/cloudera/crunch/fn/CompositeMapFn.java     |    3 +-
 .../java/com/cloudera/crunch/fn/ExtractKeyFn.java  |    3 +-
 .../java/com/cloudera/crunch/fn/IdentityFn.java    |    3 +-
 .../java/com/cloudera/crunch/fn/MapKeysFn.java     |    2 +
 .../java/com/cloudera/crunch/fn/MapValuesFn.java   |    2 +
 .../java/com/cloudera/crunch/fn/PairMapFn.java     |    2 +
 .../crunch/io/text/BZip2TextInputFormat.java       |  358 ++--
 .../cloudera/crunch/io/text/CBZip2InputStream.java | 1720 +++++++--------
 .../cloudera/crunch/io/text/TextFileSource.java    |    2 +-
 .../java/com/cloudera/crunch/lib/Aggregate.java    |   17 +-
 src/main/java/com/cloudera/crunch/lib/Cogroup.java |    6 +
 src/main/java/com/cloudera/crunch/lib/Join.java    |   22 +-
 src/main/java/com/cloudera/crunch/lib/PTables.java |    2 +
 src/main/java/com/cloudera/crunch/lib/Sample.java  |    3 +-
 src/main/java/com/cloudera/crunch/lib/Set.java     |   74 +-
 src/main/java/com/cloudera/crunch/lib/Sort.java    |   36 +-
 .../cloudera/crunch/lib/join/FullOuterJoinFn.java  |    2 +
 .../com/cloudera/crunch/lib/join/InnerJoinFn.java  |    2 +
 .../java/com/cloudera/crunch/lib/join/JoinFn.java  |    2 +
 .../com/cloudera/crunch/lib/join/JoinUtils.java    |    4 +-
 .../cloudera/crunch/lib/join/LeftOuterJoinFn.java  |    2 +
 .../cloudera/crunch/lib/join/RightOuterJoinFn.java |    3 +-
 .../cloudera/crunch/types/PGroupedTableType.java   |    5 +-
 .../com/cloudera/crunch/types/PTypeFamily.java     |    4 +-
 .../com/cloudera/crunch/types/TupleFactory.java    |    1 +
 .../crunch/types/avro/AvroGroupedTableType.java    |    2 -
 .../crunch/types/avro/AvroKeyConverter.java        |    2 +
 .../crunch/types/avro/AvroPairConverter.java       |    4 +-
 .../cloudera/crunch/types/avro/AvroTypeFamily.java |   10 +-
 .../java/com/cloudera/crunch/types/avro/Avros.java |   29 +-
 .../types/writable/WritableGroupedTableType.java   |    1 -
 .../types/writable/WritablePairConverter.java      |    4 +-
 .../crunch/types/writable/WritableTypeFamily.java  |   10 +-
 .../types/writable/WritableValueConverter.java     |    3 +-
 .../cloudera/crunch/types/writable/Writables.java  |   43 +-
 src/main/java/com/cloudera/crunch/util/PTypes.java |   17 +-
 src/main/java/com/cloudera/crunch/util/Protos.java |    4 +
 src/main/java/com/cloudera/crunch/util/Tuples.java |    4 +-
 44 files changed, 1313 insertions(+), 1170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/CombineFn.java b/src/main/java/com/cloudera/crunch/CombineFn.java
index f1b17dd..e450140 100644
--- a/src/main/java/com/cloudera/crunch/CombineFn.java
+++ b/src/main/java/com/cloudera/crunch/CombineFn.java
@@ -35,7 +35,8 @@ import com.google.common.collect.Sets;
  *
  */
 public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>,
Pair<S, T>> {
-
+  private static final long serialVersionUID = 1L;
+  
   public static interface Aggregator<T> extends Serializable {
     /**
      * Clears the internal state of this Aggregator and prepares it for the values associated
@@ -66,6 +67,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>,
Pair<S,
    * instance.
    */
   public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
+    private static final long serialVersionUID = 1L;
+    
     private final Aggregator<V> aggregator;
     
     public AggregatorCombineFn(Aggregator<V> aggregator) {
@@ -85,6 +88,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>,
Pair<S,
   }
   
   private static abstract class TupleAggregator<T> implements Aggregator<T> {
+    private static final long serialVersionUID = 1L;
+    
     private final List<Aggregator<Object>> aggregators;
     
     public TupleAggregator(Aggregator<?>...aggregators) {
@@ -113,6 +118,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
   
   public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1,
V2>> {
+    private static final long serialVersionUID = 1L;
+    
     public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
       super(a1, a2);
     }
@@ -129,6 +136,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
   
   public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A,
B, C>> {
+    private static final long serialVersionUID = 1L;
+    
     public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C>
a3) {
       super(a1, a2, a3);
     }
@@ -146,6 +155,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
 
   public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A,
B, C, D>> {
+    private static final long serialVersionUID = 1L;
+    
     public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C>
a3, Aggregator<D> a4) {
       super(a1, a2, a3, a4);
     }
@@ -163,6 +174,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
   
   public static class TupleNAggregator extends TupleAggregator<TupleN> {
+    private static final long serialVersionUID = 1L;
+    
     private final int size;
     
     public TupleNAggregator(Aggregator<?>... aggregators) {
@@ -212,7 +225,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
 
   public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>...
factories) {
-    Aggregator[] aggs = new Aggregator[factories.length];
+    Aggregator<?>[] aggs = new Aggregator[factories.length];
     for (int i = 0; i < aggs.length; i++) {
       aggs[i] = factories[i].create();
     }
@@ -328,6 +341,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
   
   public static class SumLongs implements Aggregator<Long> {
+    private static final long serialVersionUID = 1L;
+    
     private long sum = 0;
     
     @Override
@@ -350,6 +365,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class SumInts implements Aggregator<Integer> {
+    private static final long serialVersionUID = 1L;
+    
     private int sum = 0;
     
     @Override
@@ -372,6 +389,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class SumFloats implements Aggregator<Float> {
+    private static final long serialVersionUID = 1L;
+    
     private float sum = 0;
     
     @Override
@@ -394,6 +413,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class SumDoubles implements Aggregator<Double> {
+    private static final long serialVersionUID = 1L;
+    
     private double sum = 0;
     
     @Override
@@ -416,6 +437,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class SumBigInts implements Aggregator<BigInteger> {
+    private static final long serialVersionUID = 1L;
+    
     private BigInteger sum = BigInteger.ZERO;
     
     @Override
@@ -438,6 +461,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MaxLongs implements Aggregator<Long> {
+    private static final long serialVersionUID = 1L;
+    
     private Long max = null;
     
     @Override
@@ -462,6 +487,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MaxInts implements Aggregator<Integer> {
+    private static final long serialVersionUID = 1L;
+    
     private Integer max = null;
     
     @Override
@@ -486,6 +513,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MaxFloats implements Aggregator<Float> {
+    private static final long serialVersionUID = 1L;
+    
     private Float max = null;
     
     @Override
@@ -510,6 +539,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MaxDoubles implements Aggregator<Double> {
+    private static final long serialVersionUID = 1L;
+    
     private Double max = null;
     
     @Override
@@ -534,6 +565,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MaxBigInts implements Aggregator<BigInteger> {
+    private static final long serialVersionUID = 1L;
+    
     private BigInteger max = null;
     
     @Override
@@ -558,6 +591,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MinLongs implements Aggregator<Long> {
+    private static final long serialVersionUID = 1L;
+    
     private Long min = null;
     
     @Override
@@ -582,6 +617,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MinInts implements Aggregator<Integer> {
+    private static final long serialVersionUID = 1L;
+    
     private Integer min = null;
     
     @Override
@@ -606,6 +643,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MinFloats implements Aggregator<Float> {
+    private static final long serialVersionUID = 1L;
+    
     private Float min = null;
     
     @Override
@@ -630,6 +669,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MinDoubles implements Aggregator<Double> {
+    private static final long serialVersionUID = 1L;
+    
     private Double min = null;
     
     @Override
@@ -654,6 +695,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
 
   public static class MinBigInts implements Aggregator<BigInteger> {
+    private static final long serialVersionUID = 1L;
+    
     private BigInteger min = null;
     
     @Override
@@ -678,6 +721,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   };
   
   public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V>
{
+    private static final long serialVersionUID = 1L;
+    
     private final int arity;
     private transient SortedSet<V> elements;
 
@@ -711,6 +756,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
   
   public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V>
{
+    private static final long serialVersionUID = 1L;
+    
     private final int arity;
     private transient SortedSet<V> elements;
     
@@ -744,6 +791,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
   
   public static class FirstNAggregator<V> implements Aggregator<V> {
+    private static final long serialVersionUID = 1L;
+    
     private final int arity;
     private final List<V> elements;
     
@@ -771,6 +820,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   }
 
   public static class LastNAggregator<V> implements Aggregator<V> {
+    private static final long serialVersionUID = 1L;
+    
     private final int arity;
     private final LinkedList<V> elements;
     

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/DoFn.java b/src/main/java/com/cloudera/crunch/DoFn.java
index 31c9f68..bbd4e8e 100644
--- a/src/main/java/com/cloudera/crunch/DoFn.java
+++ b/src/main/java/com/cloudera/crunch/DoFn.java
@@ -35,7 +35,8 @@ import com.cloudera.crunch.test.TestCounters;
  *
  */
 public abstract class DoFn<S, T> implements Serializable {
-
+  private static final long serialVersionUID = 1L;
+  
   private transient TaskInputOutputContext<?, ?, ?, ?> context;
   private transient Configuration testConf;
   private transient String internalStatus;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/FilterFn.java b/src/main/java/com/cloudera/crunch/FilterFn.java
index a5d88b4..fee4879 100644
--- a/src/main/java/com/cloudera/crunch/FilterFn.java
+++ b/src/main/java/com/cloudera/crunch/FilterFn.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
  *
  */
 public abstract class FilterFn<T> extends DoFn<T, T> {
+  private static final long serialVersionUID = 1L;
   
   /**
    * If true, emit the given record.
@@ -48,6 +49,8 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
   }
   
   public static class AndFn<S> extends FilterFn<S> {
+    private static final long serialVersionUID = 1L;
+    
     private final List<FilterFn<S>> fns;
     
     public AndFn(FilterFn<S>... fns) {
@@ -79,6 +82,8 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
   }
   
   public static class OrFn<S> extends FilterFn<S> {
+    private static final long serialVersionUID = 1L;
+    
     private final List<FilterFn<S>> fns;
     
     public OrFn(FilterFn<S>... fns) {
@@ -110,6 +115,8 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
   }
   
   public static class NotFn<S> extends FilterFn<S> {
+    private static final long serialVersionUID = 1L;
+    
     private final FilterFn<S> base;
     
     public NotFn(FilterFn<S> base) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/MapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/MapFn.java b/src/main/java/com/cloudera/crunch/MapFn.java
index 8c11fa6..32ab9e6 100644
--- a/src/main/java/com/cloudera/crunch/MapFn.java
+++ b/src/main/java/com/cloudera/crunch/MapFn.java
@@ -21,6 +21,7 @@ package com.cloudera.crunch;
  *
  */
 public abstract class MapFn<S, T> extends DoFn<S, T> {
+  private static final long serialVersionUID = 1L;
   
   /**
    * Maps the given input into an instance of the output type.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Tuple3.java b/src/main/java/com/cloudera/crunch/Tuple3.java
index 92f8504..57e192b 100644
--- a/src/main/java/com/cloudera/crunch/Tuple3.java
+++ b/src/main/java/com/cloudera/crunch/Tuple3.java
@@ -79,7 +79,7 @@ public class Tuple3<V1, V2, V3> implements Tuple {
       return false;
     if (getClass() != obj.getClass())
       return false;
-    Tuple3 other = (Tuple3) obj;
+    Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
     return (first == other.first || (first != null && first.equals(other.first)))
&&
     	(second == other.second || (second != null && second.equals(other.second)))
&&
     	(third == other.third || (third != null && third.equals(other.third)));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/Tuple4.java b/src/main/java/com/cloudera/crunch/Tuple4.java
index 390a20f..d1edf9d 100644
--- a/src/main/java/com/cloudera/crunch/Tuple4.java
+++ b/src/main/java/com/cloudera/crunch/Tuple4.java
@@ -88,7 +88,7 @@ public class Tuple4<V1, V2, V3, V4> implements Tuple {
       return false;
     if (getClass() != obj.getClass())
       return false;
-    Tuple4 other = (Tuple4) obj;
+    Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
     return (first == other.first || (first != null && first.equals(other.first)))
&&
     	(second == other.second || (second != null && second.equals(other.second)))
&&
     	(third == other.third || (third != null && third.equals(other.third))) &&

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java b/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
index f809b25..d0ba32a 100644
--- a/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
+++ b/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java
@@ -20,7 +20,8 @@ import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.MapFn;
 
 public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
-
+  private static final long serialVersionUID = 1L;
+  
   private final MapFn<R, S> first;
   private final MapFn<S, T> second;
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java b/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
index ac78721..d3942d2 100644
--- a/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
+++ b/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java
@@ -22,7 +22,8 @@ import com.cloudera.crunch.Pair;
  * is used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
  */
 public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
-
+  private static final long serialVersionUID = 1L;
+  
   private final MapFn<V, K> mapFn;
   
   public ExtractKeyFn(MapFn<V, K> mapFn) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/IdentityFn.java b/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
index 1008d9d..d10e608 100644
--- a/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
+++ b/src/main/java/com/cloudera/crunch/fn/IdentityFn.java
@@ -17,7 +17,8 @@ package com.cloudera.crunch.fn;
 import com.cloudera.crunch.MapFn;
 
 public class IdentityFn<T> extends MapFn<T, T> {
-
+  private static final long serialVersionUID = 1L;
+  
   private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java b/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
index 6ce374f..eaf0ed0 100644
--- a/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
+++ b/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java
@@ -19,6 +19,8 @@ import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
 
 public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2,
V>> {
+  private static final long serialVersionUID = 1L;
+  
   @Override
   public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter)
{
     emitter.emit(Pair.of(map(input.first()), input.second()));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java b/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
index 7c1a8b6..df75579 100644
--- a/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
+++ b/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java
@@ -19,6 +19,8 @@ import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.Pair;
 
 public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K,
V2>> {
+  private static final long serialVersionUID = 1L;
+  
   @Override
   public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter)
{
     emitter.emit(Pair.of(input.first(), map(input.second())));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/fn/PairMapFn.java b/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
index 0e64da1..93491dc 100644
--- a/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
+++ b/src/main/java/com/cloudera/crunch/fn/PairMapFn.java
@@ -21,6 +21,8 @@ import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.Pair;
 
 public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>>
{
+  private static final long serialVersionUID = 1L;
+  
   private MapFn<K, S> keys;
   private MapFn<V, T> values;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java b/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java
index 55250dc..3c8ecb7 100644
--- a/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java
+++ b/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java
@@ -35,210 +35,208 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
-@SuppressWarnings("unchecked")
 public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
+  /**
+   * Treats keys as offset in file and value as line. Since the input file is
+   * compressed, the offset for a particular line is not well-defined. This
+   * implementation returns the starting position of a compressed block as the
+   * key for every line in that block.
+   */
+
+  private static class BZip2LineRecordReader extends RecordReader<LongWritable, Text>
{
+
+    private long start;
+
+    private long end;
+
+    private long pos;
+
+    private CBZip2InputStream in;
+
+    private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+
+    // flag to indicate if previous character read was Carriage Return ('\r')
+    // and the next character was not Line Feed ('\n')
+    private boolean CRFollowedByNonLF = false;
+
+    // in the case where a Carriage Return ('\r') was not followed by a 
+    // Line Feed ('\n'), this variable will hold that non Line Feed character
+    // that was read from the underlying stream.
+    private byte nonLFChar;
+
 
     /**
-     * Treats keys as offset in file and value as line. Since the input file is
-     * compressed, the offset for a particular line is not well-defined. This
-     * implementation returns the starting position of a compressed block as the
-     * key for every line in that block.
+     * Provide a bridge to get the bytes from the ByteArrayOutputStream without
+     * creating a new byte array.
      */
+    private static class TextStuffer extends OutputStream {
+      public Text target;
+
+      @Override
+      public void write(int b) {
+        throw new UnsupportedOperationException("write(byte) not supported");
+      }
+
+      @Override
+      public void write(byte[] data, int offset, int len) throws IOException {
+        target.clear();
+        target.set(data, offset, len);
+      }
+    }
 
-    private static class BZip2LineRecordReader extends RecordReader {
-
-        private long start;
-
-        private long end;
-
-        private long pos;
-
-        private CBZip2InputStream in;
-
-        private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
-        
-        // flag to indicate if previous character read was Carriage Return ('\r')
-        // and the next character was not Line Feed ('\n')
-        private boolean CRFollowedByNonLF = false;
-        
-        // in the case where a Carriage Return ('\r') was not followed by a 
-        // Line Feed ('\n'), this variable will hold that non Line Feed character
-        // that was read from the underlying stream.
-        private byte nonLFChar;
-        
-        
-        /**
-         * Provide a bridge to get the bytes from the ByteArrayOutputStream without
-         * creating a new byte array.
-         */
-        private static class TextStuffer extends OutputStream {
-            public Text target;
-
-            @Override
-            public void write(int b) {
-                throw new UnsupportedOperationException("write(byte) not supported");
-            }
-
-            @Override
-            public void write(byte[] data, int offset, int len) throws IOException {
-                target.clear();
-                target.set(data, offset, len);
-            }
-        }
+    private TextStuffer bridge = new TextStuffer();
+
+    private LongWritable key = new LongWritable();
+    private Text value = new Text();
+
+    public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException {
+      start = split.getStart();
+      end = start + split.getLength();
+      final Path file = split.getPath();
+
+      // open the file and seek to the start of the split
+      FileSystem fs = file.getFileSystem(job);
+      FSDataInputStream fileIn = fs.open(split.getPath());
+      fileIn.seek(start);
+
+      in = new CBZip2InputStream(fileIn, 9, end);
+      if (start != 0) {
+        // skip first line and re-establish "start".
+        // LineRecordReader.readLine(this.in, null);
+        readLine(this.in, null);
+        start = in.getPos();
+      }
+      pos = in.getPos();
+    }
 
-        private TextStuffer bridge = new TextStuffer();
-
-        private LongWritable key = new LongWritable();
-        private Text value = new Text();
-
-        public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException
{
-            start = split.getStart();
-            end = start + split.getLength();
-            final Path file = split.getPath();
-
-            // open the file and seek to the start of the split
-            FileSystem fs = file.getFileSystem(job);
-            FSDataInputStream fileIn = fs.open(split.getPath());
-            fileIn.seek(start);
-
-            in = new CBZip2InputStream(fileIn, 9, end);
-            if (start != 0) {
-                // skip first line and re-establish "start".
-                // LineRecordReader.readLine(this.in, null);
-                readLine(this.in, null);
-                start = in.getPos();
-            }
-            pos = in.getPos();
+    /*
+     * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
+     * locally.
+     */
+    private long readLine(InputStream in, 
+        OutputStream out) throws IOException {
+      long bytes = 0;
+      while (true) {
+        int b = -1;
+        if(CRFollowedByNonLF) {
+          // In the previous call, a Carriage Return ('\r') was followed
+          // by a non Line Feed ('\n') character - in that call we would
+          // have not returned the non Line Feed character but would have
+          // read it from the stream - lets use that already read character
+          // now
+          b = nonLFChar;
+          CRFollowedByNonLF = false;
+        } else {
+          b = in.read();
         }
-
-        /*
-         * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
-         * locally.
-         */
-        private long readLine(InputStream in, 
-                OutputStream out) throws IOException {
-            long bytes = 0;
-            while (true) {
-                int b = -1;
-                if(CRFollowedByNonLF) {
-                    // In the previous call, a Carriage Return ('\r') was followed
-                    // by a non Line Feed ('\n') character - in that call we would
-                    // have not returned the non Line Feed character but would have
-                    // read it from the stream - lets use that already read character
-                    // now
-                    b = nonLFChar;
-                    CRFollowedByNonLF = false;
-                } else {
-                    b = in.read();
-                }
-                if (b == -1) {
-                    break;
-                }
-                bytes += 1;
-
-                byte c = (byte)b;
-                if (c == '\n') {
-                    break;
-                }
-
-                if (c == '\r') {
-                    byte nextC = (byte)in.read();
-                    if (nextC != '\n') {
-                        CRFollowedByNonLF = true;
-                        nonLFChar = nextC;
-                    } else {
-                        bytes += 1;
-                    }
-                    break;
-                }
-
-                if (out != null) {
-                    out.write(c);
-                }
-            }
-            return bytes;
+        if (b == -1) {
+          break;
         }
+        bytes += 1;
 
-        /** Read a line. */
-        public  boolean next(LongWritable key, Text value)
-        throws IOException {
-            if (pos > end)
-                return false;
-
-            key.set(pos); // key is position
-            buffer.reset();
-            // long bytesRead = LineRecordReader.readLine(in, buffer); 
-            long bytesRead = readLine(in, buffer);
-            if (bytesRead == 0) {
-                return false;
-            }
-            pos = in.getPos();
-            // if we have read ahead because we encountered a carriage return
-            // char followed by a non line feed char, decrement the pos
-            if(CRFollowedByNonLF) {
-                pos--;
-            }
-
-            bridge.target = value;
-            buffer.writeTo(bridge);
-            return true;
+        byte c = (byte)b;
+        if (c == '\n') {
+          break;
         }
 
-        /**
-         * Get the progress within the split
-         */
-        @Override
-        public float getProgress() {
-            if (start == end) {
-                return 0.0f;
-            } else {
-                return Math.min(1.0f, (pos - start) / (float) (end - start));
-            }
+        if (c == '\r') {
+          byte nextC = (byte)in.read();
+          if (nextC != '\n') {
+            CRFollowedByNonLF = true;
+            nonLFChar = nextC;
+          } else {
+            bytes += 1;
+          }
+          break;
         }
 
-        @Override
-        public  void close() throws IOException {
-            in.close();
+        if (out != null) {
+          out.write(c);
         }
+      }
+      return bytes;
+    }
 
-        @Override
-        public LongWritable getCurrentKey() throws IOException,
-        InterruptedException {
-            return key;
-        }
+    /** Read a line. */
+    public  boolean next(LongWritable key, Text value)
+        throws IOException {
+      if (pos > end)
+        return false;
+
+      key.set(pos); // key is position
+      buffer.reset();
+      // long bytesRead = LineRecordReader.readLine(in, buffer); 
+      long bytesRead = readLine(in, buffer);
+      if (bytesRead == 0) {
+        return false;
+      }
+      pos = in.getPos();
+      // if we have read ahead because we encountered a carriage return
+      // char followed by a non line feed char, decrement the pos
+      if(CRFollowedByNonLF) {
+        pos--;
+      }
+
+      bridge.target = value;
+      buffer.writeTo(bridge);
+      return true;
+    }
 
-        @Override
-        public Text getCurrentValue() throws IOException, InterruptedException {
-            return value;
-        }
+    /**
+     * Get the progress within the split
+     */
+    @Override
+    public float getProgress() {
+      if (start == end) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (pos - start) / (float) (end - start));
+      }
+    }
 
-        @Override
-        public void initialize(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-            // no op        
-        }
+    @Override
+    public  void close() throws IOException {
+      in.close();
+    }
 
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            return next(key, value);
-        }
+    @Override
+    public LongWritable getCurrentKey() throws IOException,
+    InterruptedException {
+      return key;
+    }
 
+    @Override
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return value;
     }
 
     @Override
-    protected boolean isSplitable(JobContext context, Path file)  {
-      return true;  
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      // no op        
     }
-    
+
     @Override
-    public RecordReader createRecordReader(InputSplit split,
-            TaskAttemptContext context) {
-        try {
-          return new BZip2LineRecordReader(context.getConfiguration(), 
-                  (FileSplit) split);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return next(key, value);
+    }
+
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file)  {
+    return true;  
+  }
+
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
+      TaskAttemptContext context) {
+    try {
+      return new BZip2LineRecordReader(context.getConfiguration(), 
+          (FileSplit) split);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
+  }
 
 }
\ No newline at end of file


Mime
View raw message