incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-90: Initialize deep copier with config
Date Mon, 08 Oct 2012 15:15:01 GMT
Updated Branches:
  refs/heads/master eda3c776b -> 7fef772cf


CRUNCH-90: Initialize deep copier with config

Update the deepCopy logic to use the current ReflectDataFactory instance for
the run. This is done by providing the Configuration object in PType#initialize


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/7fef772c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/7fef772c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/7fef772c

Branch: refs/heads/master
Commit: 7fef772cf379a640697b175ad2a485439cc19f3a
Parents: eda3c77
Author: Josh Wills <jwills@apache.org>
Authored: Sun Oct 7 17:56:45 2012 -0700
Committer: Gabriel Reid <greid@apache.org>
Committed: Mon Oct 8 13:15:07 2012 +0200

----------------------------------------------------------------------
 .../crunch/impl/mr/emit/IntermediateEmitter.java   |   11 +-
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |    2 +-
 .../main/java/org/apache/crunch/lib/Aggregate.java |  147 ++++++++-------
 .../main/java/org/apache/crunch/lib/PTables.java   |   23 +--
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |    9 +-
 .../org/apache/crunch/lib/join/InnerJoinFn.java    |    9 +-
 .../java/org/apache/crunch/lib/join/JoinFn.java    |   51 ++---
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |    9 +-
 .../apache/crunch/lib/join/RightOuterJoinFn.java   |    9 +-
 .../apache/crunch/types/CollectionDeepCopier.java  |   10 +-
 .../java/org/apache/crunch/types/DeepCopier.java   |   26 ++-
 .../org/apache/crunch/types/MapDeepCopier.java     |   22 ++-
 .../main/java/org/apache/crunch/types/PType.java   |   37 ++--
 .../org/apache/crunch/types/TupleDeepCopier.java   |   16 +-
 .../apache/crunch/types/avro/AvroDeepCopier.java   |   50 +++---
 .../crunch/types/avro/AvroGroupedTableType.java    |    4 +-
 .../apache/crunch/types/avro/AvroOutputFormat.java |    2 +-
 .../apache/crunch/types/avro/AvroTableType.java    |   11 +-
 .../org/apache/crunch/types/avro/AvroType.java     |   19 ++-
 .../crunch/types/avro/ReflectDataFactory.java      |    4 +-
 .../crunch/types/avro/SafeAvroSerialization.java   |    3 +-
 .../crunch/types/writable/WritableDeepCopier.java  |   13 +-
 .../types/writable/WritableGroupedTableType.java   |    8 +-
 .../crunch/types/writable/WritableTableType.java   |   10 +-
 .../apache/crunch/types/writable/WritableType.java |   12 +-
 .../impl/mr/emit/IntermediateEmitterTest.java      |    7 +-
 .../org/apache/crunch/lib/join/JoinFnTestBase.java |    2 +
 .../crunch/types/CollectionDeepCopierTest.java     |    8 +-
 .../org/apache/crunch/types/MapDeepCopierTest.java |    5 +-
 .../apache/crunch/types/TupleDeepCopierTest.java   |    6 +-
 .../crunch/types/avro/AvroDeepCopierTest.java      |   14 +-
 .../types/avro/AvroGroupedTableTypeTest.java       |    6 +-
 .../crunch/types/avro/AvroTableTypeTest.java       |    5 +-
 .../org/apache/crunch/types/avro/AvroTypeTest.java |   59 ++++---
 .../types/writable/WritableDeepCopierTest.java     |   17 +-
 .../writable/WritableGroupedTableTypeTest.java     |    3 +-
 .../types/writable/WritableTableTypeTest.java      |    6 +-
 .../crunch/types/writable/WritableTypeTest.java    |   20 +-
 38 files changed, 374 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
index d609489..b6df98b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
@@ -23,25 +23,28 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.run.RTNode;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.ImmutableList;
 
 /**
- * An {@link Emitter} implementation that links the output of one {@link DoFn}
- * to the input of another {@code DoFn}.
+ * An {@link Emitter} implementation that links the output of one {@link DoFn} to the input of
+ * another {@code DoFn}.
  * 
  */
 public class IntermediateEmitter implements Emitter<Object> {
 
   private final List<RTNode> children;
+  private final Configuration conf;
   private final PType<Object> outputPType;
   private final boolean needDetachedValues;
 
-  public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children) {
+  public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children, Configuration conf) {
     this.outputPType = outputPType;
     this.children = ImmutableList.copyOf(children);
+    this.conf = conf;
 
-    outputPType.initialize();
+    outputPType.initialize(conf);
     needDetachedValues = this.children.size() > 1;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index 4df989b..1f5124c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -74,7 +74,7 @@ public class RTNode implements Serializable {
         this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
       }
     } else if (!children.isEmpty()) {
-      this.emitter = new IntermediateEmitter(outputPType, children);
+      this.emitter = new IntermediateEmitter(outputPType, children, ctxt.getContext().getConfiguration());
     } else {
       throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
index dc3de7c..f28cca4 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -40,15 +40,14 @@ import org.apache.crunch.types.PTypeFamily;
 import com.google.common.collect.Lists;
 
 /**
- * Methods for performing various types of aggregations over {@link PCollection}
- * instances.
- *
+ * Methods for performing various types of aggregations over {@link PCollection} instances.
+ * 
  */
 public class Aggregate {
 
   /**
-   * Returns a {@code PTable} that contains the unique elements of this
-   * collection mapped to a count of their occurrences.
+   * Returns a {@code PTable} that contains the unique elements of this collection mapped to a count
+   * of their occurrences.
    */
   public static <S> PTable<S, Long> count(PCollection<S> collect) {
     PTypeFamily tf = collect.getTypeFamily();
@@ -56,24 +55,26 @@ public class Aggregate {
       public Pair<S, Long> map(S input) {
         return Pair.of(input, 1L);
       }
-    }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey().combineValues(CombineFn.<S> SUM_LONGS());
+    }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey()
+        .combineValues(CombineFn.<S> SUM_LONGS());
   }
 
   /**
    * Returns the number of elements in the provided PCollection.
-   *
+   * 
    * @param collect The PCollection whose elements should be counted.
    * @param <S> The type of the PCollection.
    * @return A {@code PObject} containing the number of elements in the {@code PCollection}.
    */
   public static <S> PObject<Long> length(PCollection<S> collect) {
     PTypeFamily tf = collect.getTypeFamily();
-    PTable<Integer, Long> countTable = collect.parallelDo("Aggregate.count",
-        new MapFn<S, Pair<Integer, Long>>() {
+    PTable<Integer, Long> countTable = collect
+        .parallelDo("Aggregate.count", new MapFn<S, Pair<Integer, Long>>() {
           public Pair<Integer, Long> map(S input) {
             return Pair.of(1, 1L);
           }
-        }, tf.tableOf(tf.ints(), tf.longs())).groupByKey().combineValues(CombineFn.<Integer> SUM_LONGS());
+        }, tf.tableOf(tf.ints(), tf.longs())).groupByKey()
+        .combineValues(CombineFn.<Integer> SUM_LONGS());
     PCollection<Long> count = countTable.values();
     return new FirstElementPObject<Long>(count);
   }
@@ -132,7 +133,8 @@ public class Aggregate {
     }
 
     @Override
-    public void process(Pair<Integer, Iterable<Pair<K, V>>> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+    public void process(Pair<Integer, Iterable<Pair<K, V>>> input,
+        Emitter<Pair<Integer, Pair<K, V>>> emitter) {
       Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
       PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp);
       for (Pair<K, V> pair : input.second()) {
@@ -155,8 +157,8 @@ public class Aggregate {
     PTableType<K, V> base = ptable.getPTableType();
     PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
     PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
-    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter).groupByKey(1)
-        .combineValues(new TopKCombineFn<K, V>(limit, maximize))
+    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter)
+        .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize))
         .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
           public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K, V>> emitter) {
             emitter.emit(input.second());
@@ -174,31 +176,33 @@ public class Aggregate {
           + collect.getPType().getTypeClass());
     }
     PTypeFamily tf = collect.getTypeFamily();
-    PCollection<S> maxCollect = PTables.values(collect.parallelDo("max", new DoFn<S, Pair<Boolean, S>>() {
-      private transient S max = null;
+    PCollection<S> maxCollect = PTables.values(collect
+        .parallelDo("max", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S max = null;
 
-      public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
-        if (max == null || ((Comparable<S>) max).compareTo(input) < 0) {
-          max = input;
-        }
-      }
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (max == null || ((Comparable<S>) max).compareTo(input) < 0) {
+              max = input;
+            }
+          }
 
-      public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
-        if (max != null) {
-          emitter.emit(Pair.of(true, max));
-        }
-      }
-    }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1).combineValues(new CombineFn<Boolean, S>() {
-      public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
-        S max = null;
-        for (S v : input.second()) {
-          if (max == null || ((Comparable<S>) max).compareTo(v) < 0) {
-            max = v;
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (max != null) {
+              emitter.emit(Pair.of(true, max));
+            }
           }
-        }
-        emitter.emit(Pair.of(input.first(), max));
-      }
-    }));
+        }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1)
+        .combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
+            S max = null;
+            for (S v : input.second()) {
+              if (max == null || ((Comparable<S>) max).compareTo(v) < 0) {
+                max = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), max));
+          }
+        }));
     return new FirstElementPObject<S>(maxCollect);
   }
 
@@ -212,51 +216,54 @@ public class Aggregate {
           + collect.getPType().getTypeClass());
     }
     PTypeFamily tf = collect.getTypeFamily();
-    PCollection<S> minCollect = PTables.values(collect.parallelDo("min", new DoFn<S, Pair<Boolean, S>>() {
-      private transient S min = null;
+    PCollection<S> minCollect = PTables.values(collect
+        .parallelDo("min", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S min = null;
 
-      public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
-        if (min == null || ((Comparable<S>) min).compareTo(input) > 0) {
-          min = input;
-        }
-      }
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (min == null || ((Comparable<S>) min).compareTo(input) > 0) {
+              min = input;
+            }
+          }
 
-      public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
-        if (min != null) {
-          emitter.emit(Pair.of(false, min));
-        }
-      }
-    }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey().combineValues(new CombineFn<Boolean, S>() {
-      public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
-        S min = null;
-        for (S v : input.second()) {
-          if (min == null || ((Comparable<S>) min).compareTo(v) > 0) {
-            min = v;
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (min != null) {
+              emitter.emit(Pair.of(false, min));
+            }
           }
-        }
-        emitter.emit(Pair.of(input.first(), min));
-      }
-    }));
+        }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey()
+        .combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input, Emitter<Pair<Boolean, S>> emitter) {
+            S min = null;
+            for (S v : input.second()) {
+              if (min == null || ((Comparable<S>) min).compareTo(v) > 0) {
+                min = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), min));
+          }
+        }));
     return new FirstElementPObject<S>(minCollect);
   }
 
   public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
     PTypeFamily tf = collect.getTypeFamily();
     final PType<V> valueType = collect.getValueType();
-    return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() {
+    return collect.groupByKey().parallelDo("collect",
+        new MapValuesFn<K, Iterable<V>, Collection<V>>() {
 
-      @Override
-      public void initialize() {
-        valueType.initialize();
-      }
+          @Override
+          public void initialize() {
+            valueType.initialize(getConfiguration());
+          }
 
-      public Collection<V> map(Iterable<V> values) {
-        List<V> collected = Lists.newArrayList();
-        for (V value : values) {
-          collected.add(valueType.getDetachedValue(value));
-        }
-        return collected;
-      }
-    }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));
+          public Collection<V> map(Iterable<V> values) {
+            List<V> collected = Lists.newArrayList();
+            for (V value : values) {
+              collected.add(valueType.getDetachedValue(value));
+            }
+            return collected;
+          }
+        }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/PTables.java b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
index 71c04ff..e788656 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
@@ -58,31 +58,27 @@ public class PTables {
   /**
    * Create a detached value for a table {@link Pair}.
    * 
-   * @param tableType
-   *          The table type
-   * @param value
-   *          The value from which a detached value is to be created
+   * @param tableType The table type
+   * @param value The value from which a detached value is to be created
    * @return The detached value
    * @see PType#getDetachedValue(Object)
    */
   public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) {
-    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
-        tableType.getValueType().getDetachedValue(value.second()));
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType()
+        .getDetachedValue(value.second()));
   }
 
   /**
    * Created a detached value for a {@link PGroupedTable} value.
    * 
    * 
-   * @param groupedTableType
-   *          The grouped table type
-   * @param value
-   *          The value from which a detached value is to be created
+   * @param groupedTableType The grouped table type
+   * @param value The value from which a detached value is to be created
    * @return The detached value
    * @see PType#getDetachedValue(Object)
    */
-  public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(PGroupedTableType<K, V> groupedTableType,
-      Pair<K, Iterable<V>> value) {
+  public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(
+      PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) {
 
     PTableType<K, V> tableType = groupedTableType.getTableType();
     List<V> detachedIterable = Lists.newArrayList();
@@ -90,6 +86,7 @@ public class PTables {
     for (V v : value.second()) {
       detachedIterable.add(valueType.getDetachedValue(v));
     }
-    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), (Iterable<V>) detachedIterable);
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
+        (Iterable<V>) detachedIterable);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
index 0ceb382..834396a 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -28,12 +28,9 @@ import com.google.common.collect.Lists;
 /**
  * Used to perform the last step of an full outer join.
  * 
- * @param <K>
- *          Type of the keys.
- * @param <U>
- *          Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V>
- *          Type of the second {@link org.apache.crunch.PTable}'s values
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
index 5275c95..a3d30d2 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -28,12 +28,9 @@ import com.google.common.collect.Lists;
 /**
  * Used to perform the last step of an inner join.
  * 
- * @param <K>
- *          Type of the keys.
- * @param <U>
- *          Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V>
- *          Type of the second {@link org.apache.crunch.PTable}'s values
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
index dab6c34..99aea5a 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -25,26 +25,22 @@ import org.apache.crunch.types.PType;
 /**
  * Represents a {@link org.apache.crunch.DoFn} for performing joins.
  * 
- * @param <K>
- *          Type of the keys.
- * @param <U>
- *          Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V>
- *          Type of the second {@link org.apache.crunch.PTable}'s values
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
  */
-public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+public abstract class JoinFn<K, U, V> extends
+    DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
 
   protected PType<K> keyType;
   protected PType<U> leftValueType;
 
   /**
-   * Instantiate with the PType of the value of the left side of the join (used
-   * for creating deep copies of values).
+   * Instantiate with the PType of the value of the left side of the join (used for creating deep
+   * copies of values).
    * 
-   * @param keyType
-   *          The PType of the value used as the key of the join
-   * @param leftValueType
-   *          The PType of the value type of the left side of the join
+   * @param keyType The PType of the value used as the key of the join
+   * @param leftValueType The PType of the value type of the left side of the join
    */
   public JoinFn(PType<K> keyType, PType<U> leftValueType) {
     this.keyType = keyType;
@@ -53,8 +49,8 @@ public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>, Iterab
 
   @Override
   public void initialize() {
-    this.keyType.initialize();
-    this.leftValueType.initialize();
+    this.keyType.initialize(getConfiguration());
+    this.leftValueType.initialize(getConfiguration());
   }
 
   /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
@@ -63,28 +59,23 @@ public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>, Iterab
   /**
    * Performs the actual joining.
    * 
-   * @param key
-   *          The key for this grouping of values.
-   * @param id
-   *          The side that this group of values is from (0 -> left, 1 ->
-   *          right).
-   * @param pairs
-   *          The group of values associated with this key and id pair.
-   * @param emitter
-   *          The emitter to send the output to.
+   * @param key The key for this grouping of values.
+   * @param id The side that this group of values is from (0 -> left, 1 -> right).
+   * @param pairs The group of values associated with this key and id pair.
+   * @param emitter The emitter to send the output to.
    */
-  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter);
+  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter);
 
   /**
    * Split up the input record to make coding a bit more manageable.
    * 
-   * @param input
-   *          The input record.
-   * @param emitter
-   *          The emitter to send the output to.
+   * @param input The input record.
+   * @param emitter The emitter to send the output to.
    */
   @Override
-  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
     join(input.first().first(), input.first().second(), input.second(), emitter);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
index 116353a..18288a4 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -28,12 +28,9 @@ import com.google.common.collect.Lists;
 /**
  * Used to perform the last step of an left outer join.
  * 
- * @param <K>
- *          Type of the keys.
- * @param <U>
- *          Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V>
- *          Type of the second {@link org.apache.crunch.PTable}'s values
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
index 51b74cc..2789d40 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -28,12 +28,9 @@ import com.google.common.collect.Lists;
 /**
  * Used to perform the last step of an right outer join.
  * 
- * @param <K>
- *          Type of the keys.
- * @param <U>
- *          Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V>
- *          Type of the second {@link org.apache.crunch.PTable}'s values
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
index 2216ef7..db62e85 100644
--- a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java
@@ -20,13 +20,14 @@ package org.apache.crunch.types;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.collect.Lists;
 
 /**
  * Performs deep copies (based on underlying PType deep copying) of Collections.
  * 
- * @param <T>
- *          The type of Tuple implementation being copied
+ * @param <T> The type of Tuple implementation being copied
  */
 public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
 
@@ -37,6 +38,11 @@ public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> {
   }
 
   @Override
+  public void initialize(Configuration conf) {
+    this.elementType.initialize(conf);
+  }
+
+  @Override
   public Collection<T> deepCopy(Collection<T> source) {
     List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size());
     for (T value : source) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
index a96e7bf..f146e86 100644
--- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java
@@ -19,30 +19,42 @@ package org.apache.crunch.types;
 
 import java.io.Serializable;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Performs deep copies of values.
  * 
- * @param <T>
- *          The type of value that will be copied
+ * @param <T> The type of value that will be copied
  */
 public interface DeepCopier<T> extends Serializable {
 
   /**
+   * Initialize the deep copier with a job-specific configuration
+   * 
+   * @param conf Job-specific configuration
+   */
+  void initialize(Configuration conf);
+
+  /**
    * Create a deep copy of a value.
    * 
-   * @param source
-   *          The value to be copied
+   * @param source The value to be copied
    * @return The deep copy of the value
    */
   T deepCopy(T source);
-  
+
   static class NoOpDeepCopier<V> implements DeepCopier<V> {
 
     @Override
     public V deepCopy(V source) {
       return source;
     }
-    
+
+    @Override
+    public void initialize(Configuration conf) {
+      // No initialization needed
+    }
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
index b1e4b37..55daaee 100644
--- a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java
@@ -20,25 +20,31 @@ package org.apache.crunch.types;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.collect.Maps;
 
-public class MapDeepCopier<T> implements DeepCopier<Map<String,T>> {
+public class MapDeepCopier<T> implements DeepCopier<Map<String, T>> {
 
   private final PType<T> ptype;
-  
-  public MapDeepCopier(PType<T> ptype){
+
+  public MapDeepCopier(PType<T> ptype) {
     this.ptype = ptype;
   }
-  
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.ptype.initialize(conf);
+  }
+
   @Override
   public Map<String, T> deepCopy(Map<String, T> source) {
-    Map<String,T> deepCopyMap = Maps.newHashMap();
-    for (Entry<String, T> entry : source.entrySet()){
+    Map<String, T> deepCopyMap = Maps.newHashMap();
+    for (Entry<String, T> entry : source.entrySet()) {
       deepCopyMap.put(entry.getKey(), ptype.getDetachedValue(entry.getValue()));
     }
     return deepCopyMap;
-    
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java
index bbe8a4b..565615a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PType.java
@@ -24,14 +24,14 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 /**
- * A {@code PType} defines a mapping between a data type that is used in a
- * Crunch pipeline and a serialization and storage format that is used to
- * read/write data from/to HDFS. Every {@link PCollection} has an associated
- * {@code PType} that tells Crunch how to read/write data from that
- * {@code PCollection}.
+ * A {@code PType} defines a mapping between a data type that is used in a Crunch pipeline and a
+ * serialization and storage format that is used to read/write data from/to HDFS. Every
+ * {@link PCollection} has an associated {@code PType} that tells Crunch how to read/write data from
+ * that {@code PCollection}.
  * 
  */
 public interface PType<T> extends Serializable {
@@ -52,38 +52,35 @@ public interface PType<T> extends Serializable {
   Converter getConverter();
 
   /**
-   * Initialize this PType for use within a DoFn. This generally only needs to
-   * be called when using a PType for {@link #getDetachedValue(Object)}.
+   * Initialize this PType for use within a DoFn. This generally only needs to be called when using
+   * a PType for {@link #getDetachedValue(Object)}.
    * 
+   * @param conf Configuration object
    * @see PType#getDetachedValue(Object)
    */
-  void initialize();
+  void initialize(Configuration conf);
 
   /**
-   * Returns a copy of a value (or the value itself) that can safely be
-   * retained.
+   * Returns a copy of a value (or the value itself) that can safely be retained.
    * <p>
-   * This is useful when iterable values being processed in a DoFn (via a
-   * reducer) need to be held on to for more than the scope of a single
-   * iteration, as a reducer (and therefore also a DoFn that has an Iterable as
-   * input) re-use deserialized values. More information on object reuse is
+   * This is useful when iterable values being processed in a DoFn (via a reducer) need to be held
+   * on to for more than the scope of a single iteration, as a reducer (and therefore also a DoFn
+   * that has an Iterable as input) re-use deserialized values. More information on object reuse is
    * available in the {@link DoFn} class documentation.
    * 
-   * @param value
-   *          The value to be deep-copied
+   * @param value The value to be deep-copied
    * @return A deep copy of the input value
    */
   T getDetachedValue(T value);
 
   /**
-   * Returns a {@code SourceTarget} that is able to read/write data using the
-   * serialization format specified by this {@code PType}.
+   * Returns a {@code SourceTarget} that is able to read/write data using the serialization format
+   * specified by this {@code PType}.
    */
   SourceTarget<T> getDefaultFileSource(Path path);
 
   /**
-   * Returns the sub-types that make up this PType if it is a composite
-   * instance, such as a tuple.
+   * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
    */
   List<PType> getSubTypes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
index 4f473a0..45e38c7 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
@@ -20,27 +20,33 @@ package org.apache.crunch.types;
 import java.util.List;
 
 import org.apache.crunch.Tuple;
+import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Lists;
 
 /**
- * Performs deep copies (based on underlying PType deep copying) of Tuple-based
- * objects.
+ * Performs deep copies (based on underlying PType deep copying) of Tuple-based objects.
  * 
- * @param <T>
- *          The type of Tuple implementation being copied
+ * @param <T> The type of Tuple implementation being copied
  */
 public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
 
   private final TupleFactory<T> tupleFactory;
   private final List<PType> elementTypes;
 
-  public TupleDeepCopier(Class<T> tupleClass, PType...elementTypes) {
+  public TupleDeepCopier(Class<T> tupleClass, PType... elementTypes) {
     tupleFactory = TupleFactory.getTupleFactory(tupleClass);
     this.elementTypes = Lists.newArrayList(elementTypes);
   }
 
   @Override
+  public void initialize(Configuration conf) {
+    for (PType elementType : elementTypes) {
+      elementType.initialize(conf);
+    }
+  }
+
+  @Override
   public T deepCopy(T source) {
     Object[] deepCopyValues = new Object[source.size()];
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index fe4fe1a..b431123 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -31,24 +31,23 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.types.DeepCopier;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Performs deep copies of Avro-serializable objects.
  * <p>
- * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be
- * a problem when running in a map-reduce context where each mapper/reducer is
- * running in its own JVM, but it may well be a problem in any other kind of
- * multi-threaded context.
+ * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be a problem when
+ * running in a map-reduce context where each mapper/reducer is running in its own JVM, but it may
+ * well be a problem in any other kind of multi-threaded context.
  */
 public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
 
   private String jsonSchema;
+  private transient Configuration conf;
   private transient Schema schema;
   private BinaryEncoder binaryEncoder;
   private BinaryDecoder binaryDecoder;
@@ -67,11 +66,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     return schema;
   }
 
+  @Override
+  public void initialize(Configuration conf) {
+    this.conf = conf;
+  }
+
   protected abstract T createCopyTarget();
 
-  protected abstract DatumWriter<T> createDatumWriter();
+  protected abstract DatumWriter<T> createDatumWriter(Configuration conf);
 
-  protected abstract DatumReader<T> createDatumReader();
+  protected abstract DatumReader<T> createDatumReader(Configuration conf);
 
   /**
    * Deep copier for Avro specific data objects.
@@ -91,12 +95,12 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     }
 
     @Override
-    protected DatumWriter<T> createDatumWriter() {
+    protected DatumWriter<T> createDatumWriter(Configuration conf) {
       return new SpecificDatumWriter<T>(getSchema());
     }
 
     @Override
-    protected DatumReader<T> createDatumReader() {
+    protected DatumReader<T> createDatumReader(Configuration conf) {
       return new SpecificDatumReader<T>(getSchema());
     }
 
@@ -119,12 +123,12 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     }
 
     @Override
-    protected DatumReader<Record> createDatumReader() {
+    protected DatumReader<Record> createDatumReader(Configuration conf) {
       return new GenericDatumReader<Record>(getSchema());
     }
 
     @Override
-    protected DatumWriter<Record> createDatumWriter() {
+    protected DatumWriter<Record> createDatumWriter(Configuration conf) {
       return new GenericDatumWriter<Record>(getSchema());
     }
   }
@@ -147,34 +151,29 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     }
 
     @Override
-    protected DatumReader<T> createDatumReader() {
-      return new ReflectDatumReader<T>(getSchema());
+    protected DatumReader<T> createDatumReader(Configuration conf) {
+      return Avros.getReflectDataFactory(conf).getReader(getSchema());
     }
 
     @Override
-    protected DatumWriter<T> createDatumWriter() {
-      return new ReflectDatumWriter<T>(getSchema());
+    protected DatumWriter<T> createDatumWriter(Configuration conf) {
+      return Avros.getReflectDataFactory(conf).getWriter(getSchema());
     }
   }
 
-  public static class AvroTupleDeepCopier {
-
-  }
-
   /**
    * Create a deep copy of an Avro value.
    * 
-   * @param source
-   *          The value to be copied
+   * @param source The value to be copied
    * @return The deep copy of the value
    */
   @Override
   public T deepCopy(T source) {
     if (datumReader == null) {
-      datumReader = createDatumReader();
+      datumReader = createDatumReader(conf);
     }
     if (datumWriter == null) {
-      datumWriter = createDatumWriter();
+      datumWriter = createDatumWriter(conf);
     }
     ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
     binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
@@ -182,7 +181,8 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
     try {
       datumWriter.write(source, binaryEncoder);
       binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      binaryDecoder = DecoderFactory.get()
+          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
       datumReader.read(target, binaryDecoder);
     } catch (Exception e) {
       throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index aa5b5dc..2b4def5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -72,8 +72,8 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   }
 
   @Override
-  public void initialize() {
-    // No initialization needed for Avro PTypes
+  public void initialize(Configuration conf) {
+    getTableType().initialize(conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 2582cc2..98d3f50 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -49,7 +49,7 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
     }
 
     ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter());
+    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter(schema));
 
     JobConf jc = new JobConf(conf);
     /* copied from org.apache.avro.mapred.AvroOutputFormat */

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index bd4b14c..285b423 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -66,8 +66,8 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
     public void initialize() {
       keyMapFn.setContext(getContext());
       valueMapFn.setContext(getContext());
-      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(new Schema.Parser().parse(firstJson),
-          new Schema.Parser().parse(secondJson)).toString();
+      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
+          new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
     }
 
     @Override
@@ -120,9 +120,10 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
   private final AvroType<V> valueType;
 
   public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
-    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()),
-        new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType,
-            valueType), new TupleDeepCopier(Pair.class, keyType, valueType), keyType, valueType);
+    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
+        valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
+        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(
+        Pair.class, keyType, valueType), keyType, valueType);
     this.keyType = keyType;
     this.valueType = valueType;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index a127baa..a0e2722 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -31,6 +31,7 @@ import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
@@ -52,13 +53,14 @@ public class AvroType<T> implements PType<T> {
   private final MapFn baseOutputMapFn;
   private final List<PType> subTypes;
   private DeepCopier<T> deepCopier;
+  private boolean initialized = false;
 
   public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
     this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
   }
 
-  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, DeepCopier<T> deepCopier,
-      PType... ptypes) {
+  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+      DeepCopier<T> deepCopier, PType... ptypes) {
     this.typeClass = typeClass;
     this.schema = Preconditions.checkNotNull(schema);
     this.schemaString = schema.toString();
@@ -99,7 +101,7 @@ public class AvroType<T> implements PType<T> {
     if (Avros.isPrimitive(this)) {
       return false;
     }
-    
+
     if (!this.subTypes.isEmpty()) {
       for (PType<?> subType : this.subTypes) {
         AvroType<?> atype = (AvroType<?>) subType;
@@ -109,7 +111,7 @@ public class AvroType<T> implements PType<T> {
       }
       return false;
     }
-    
+
     return SpecificRecord.class.isAssignableFrom(typeClass);
   }
 
@@ -164,11 +166,16 @@ public class AvroType<T> implements PType<T> {
   }
 
   @Override
-  public void initialize() {
-    // No initialization needed for Avro PTypes
+  public void initialize(Configuration conf) {
+    deepCopier.initialize(conf);
+    initialized = true;
   }
 
+  @Override
   public T getDetachedValue(T value) {
+    if (!initialized) {
+      throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
+    }
     return deepCopier.deepCopy(value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
index c19a168..e973cca 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -35,7 +35,7 @@ public class ReflectDataFactory {
     return new ReflectDatumReader<T>(schema);
   }
 
-  public <T> ReflectDatumWriter<T> getWriter() {
-    return new ReflectDatumWriter<T>();
+  public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
+    return new ReflectDatumWriter<T>(schema);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index 266179e..438976c 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -111,8 +111,7 @@ public class SafeAvroSerialization<T> extends Configured implements Serializatio
         .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
 
     ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    ReflectDatumWriter<T> writer = factory.getWriter();
-    writer.setSchema(schema);
+    ReflectDatumWriter<T> writer = factory.getWriter(schema);
     return new AvroWrapperSerializer(writer);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
index 6469208..ae4614d 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
@@ -25,22 +25,27 @@ import java.io.DataOutputStream;
 
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.types.DeepCopier;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
-
 /**
  * Performs deep copies of Writable values.
+ * 
  * @param <T> The type of Writable that can be copied
  */
-public class WritableDeepCopier<T extends Writable> implements DeepCopier<T>{
-  
+public class WritableDeepCopier<T extends Writable> implements DeepCopier<T> {
+
   private Class<T> writableClass;
 
-  public WritableDeepCopier(Class<T> writableClass){
+  public WritableDeepCopier(Class<T> writableClass) {
     this.writableClass = writableClass;
   }
 
   @Override
+  public void initialize(Configuration conf) {
+  }
+
+  @Override
   public T deepCopy(T source) {
     ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
     DataOutputStream dataOut = new DataOutputStream(byteOutStream);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
index 98afb4d..32c9111 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -23,6 +23,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 
 public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
@@ -37,7 +38,8 @@ public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
     WritableType valueType = (WritableType) tableType.getValueType();
     this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
     this.outputFn = tableType.getOutputMapFn();
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(), valueType.getSerializationClass());
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
+        valueType.getSerializationClass());
   }
 
   @Override
@@ -61,8 +63,8 @@ public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   }
 
   @Override
-  public void initialize() {
-    this.tableType.initialize();
+  public void initialize(Configuration conf) {
+    this.tableType.initialize(conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
index 6bb6c5d..2f75b94 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -31,6 +31,7 @@ import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 
@@ -49,7 +50,8 @@ class WritableTableType<K, V> implements PTableType<K, V> {
     this.valueType = valueType;
     this.inputFn = new PairMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
     this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(), valueType.getSerializationClass());
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
+        valueType.getSerializationClass());
   }
 
   @Override
@@ -101,9 +103,9 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   }
 
   @Override
-  public void initialize() {
-    keyType.initialize();
-    valueType.initialize();
+  public void initialize(Configuration conf) {
+    keyType.initialize(conf);
+    valueType.initialize(conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index 71f81f4..93b0518 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -27,6 +27,7 @@ import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.DeepCopier;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 
@@ -43,8 +44,8 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   private final List<PType> subTypes;
   private boolean initialized = false;
 
-  WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn,
-      PType... subTypes) {
+  WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn,
+      MapFn<T, W> outputDoFn, PType... subTypes) {
     this.typeClass = typeClass;
     this.writableClass = writableClass;
     this.inputFn = inputDoFn;
@@ -99,15 +100,16 @@ public class WritableType<T, W extends Writable> implements PType<T> {
       return false;
     }
     WritableType wt = (WritableType) obj;
-    return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes.equals(wt.subTypes));
+    return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes
+        .equals(wt.subTypes));
   }
 
   @Override
-  public void initialize() {
+  public void initialize(Configuration conf) {
     this.inputFn.initialize();
     this.outputFn.initialize();
     for (PType subType : subTypes) {
-      subType.initialize();
+      subType.initialize(conf);
     }
     this.initialized = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
index 998e654..dd72364 100644
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
@@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.run.RTNode;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -48,7 +49,8 @@ public class IntermediateEmitterTest {
   @Test
   public void testEmit_SingleChild() {
     RTNode singleChild = mock(RTNode.class);
-    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild));
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild),
+        new Configuration());
     emitter.emit(stringWrapper);
 
     ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class);
@@ -60,7 +62,8 @@ public class IntermediateEmitterTest {
   public void testEmit_MultipleChildren() {
     RTNode childA = mock(RTNode.class);
     RTNode childB = mock(RTNode.class);
-    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB));
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB),
+        new Configuration());
     emitter.emit(stringWrapper);
 
     ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
index 270d2c7..741899e 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 import org.apache.crunch.test.StringWrapper;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,6 +41,7 @@ public abstract class JoinFnTestBase {
   @Before
   public void setUp() {
     joinFn = getJoinFn();
+    joinFn.setConfigurationForTest(new Configuration());
     joinFn.initialize();
     emitter = mock(Emitter.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
index d3e7dfa..e28d094 100644
--- a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java
@@ -17,12 +17,14 @@
  */
 package org.apache.crunch.types;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 
 import java.util.Collection;
 
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -37,7 +39,9 @@ public class CollectionDeepCopierTest {
     person.siblingnames = Lists.<CharSequence> newArrayList();
 
     Collection<Person> personCollection = Lists.newArrayList(person);
-    CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(Avros.records(Person.class));
+    CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(
+        Avros.records(Person.class));
+    collectionDeepCopier.initialize(new Configuration());
 
     Collection<Person> deepCopyCollection = collectionDeepCopier.deepCopy(personCollection);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
index f351691..b61a33f 100644
--- a/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -37,7 +38,9 @@ public class MapDeepCopierTest {
     Map<String, StringWrapper> map = Maps.newHashMap();
     map.put(key, stringWrapper);
 
-    MapDeepCopier<StringWrapper> deepCopier = new MapDeepCopier<StringWrapper>(Avros.reflects(StringWrapper.class));
+    MapDeepCopier<StringWrapper> deepCopier = new MapDeepCopier<StringWrapper>(
+        Avros.reflects(StringWrapper.class));
+    deepCopier.initialize(new Configuration());
     Map<String, StringWrapper> deepCopy = deepCopier.deepCopy(map);
 
     assertEquals(map, deepCopy);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
index c332285..0760c7e 100644
--- a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotSame;
 import org.apache.crunch.Pair;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -37,9 +38,10 @@ public class TupleDeepCopierTest {
     person.siblingnames = Lists.<CharSequence> newArrayList();
 
     Pair<Integer, Person> inputPair = Pair.of(1, person);
-    DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(
-        Pair.class, Avros.ints(), Avros.records(Person.class));
+    DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>(Pair.class, Avros.ints(),
+        Avros.records(Person.class));
 
+    deepCopier.initialize(new Configuration());
     Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair);
 
     assertEquals(inputPair, deepCopyPair);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
index af6acb8..bb59136 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -63,17 +64,17 @@ public class AvroDeepCopierTest {
     String name;
     int age;
     List<String> siblingnames;
-    
+
     @Override
     public boolean equals(Object other) {
       if (other == null || !(other instanceof ReflectedPerson)) {
         return false;
       }
       ReflectedPerson that = (ReflectedPerson) other;
-      return name.equals(that.name)&& age == that.age && siblingnames.equals(that.siblingnames); 
+      return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames);
     }
   }
-  
+
   @Test
   public void testDeepCopyReflect() {
     ReflectedPerson person = new ReflectedPerson();
@@ -81,8 +82,11 @@ public class AvroDeepCopierTest {
     person.age = 42;
     person.siblingnames = Lists.newArrayList();
 
-    ReflectedPerson deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>(
-        ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema()).deepCopy(person);
+    AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>(
+        ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema());
+    avroDeepCopier.initialize(new Configuration());
+
+    ReflectedPerson deepCopyPerson = avroDeepCopier.deepCopy(person);
 
     assertEquals(person, deepCopyPerson);
     assertNotSame(person, deepCopyPerson);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
index e5518f7..db9ebdc 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.crunch.Pair;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -43,8 +44,9 @@ public class AvroGroupedTableTypeTest {
     Iterable<Person> inputPersonIterable = Lists.newArrayList(person);
     Pair<Integer, Iterable<Person>> pair = Pair.of(integerValue, inputPersonIterable);
 
-    PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(), Avros.specifics(Person.class))
-        .getGroupedTableType();
+    PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(),
+        Avros.specifics(Person.class)).getGroupedTableType();
+    groupedTableType.initialize(new Configuration());
 
     Pair<Integer, Iterable<Person>> detachedPair = groupedTableType.getDetachedValue(pair);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
index 5e03ff8..35d4e5b 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.crunch.Pair;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -42,7 +43,9 @@ public class AvroTableTypeTest {
 
     Pair<Integer, Person> pair = Pair.of(integerValue, person);
 
-    AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(), Avros.specifics(Person.class));
+    AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(),
+        Avros.specifics(Person.class));
+    tableType.initialize(new Configuration());
 
     Pair<Integer, Person> detachedPair = tableType.getDetachedValue(pair);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 170bebf..383ca27 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -33,6 +33,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -150,6 +151,7 @@ public class AvroTypeTest {
   @Test
   public void testGetDetachedValue_GenericAvroType() {
     AvroType<Record> genericType = Avros.generics(Person.SCHEMA$);
+    genericType.initialize(new Configuration());
     GenericData.Record record = new GenericData.Record(Person.SCHEMA$);
     record.put("name", "name value");
     record.put("age", 42);
@@ -159,8 +161,8 @@ public class AvroTypeTest {
     assertEquals(record, detachedRecord);
     assertNotSame(record, detachedRecord);
   }
-  
-  private Person createPerson(){
+
+  private Person createPerson() {
     Person person = new Person();
     person.name = "name value";
     person.age = 42;
@@ -171,30 +173,39 @@ public class AvroTypeTest {
   @Test
   public void testGetDetachedValue_SpecificAvroType() {
     AvroType<Person> specificType = Avros.specifics(Person.class);
+    specificType.initialize(new Configuration());
     Person person = createPerson();
     Person detachedPerson = specificType.getDetachedValue(person);
     assertEquals(person, detachedPerson);
     assertNotSame(person, detachedPerson);
   }
 
+  @Test(expected = IllegalStateException.class)
+  public void testGetDetachedValue_NotInitialized() {
+    AvroType<Person> specificType = Avros.specifics(Person.class);
+    Person person = createPerson();
+    specificType.getDetachedValue(person);
+  }
+
   static class ReflectedPerson {
     String name;
     int age;
     List<String> siblingnames;
-    
+
     @Override
     public boolean equals(Object other) {
       if (other == null || !(other instanceof ReflectedPerson)) {
         return false;
       }
       ReflectedPerson that = (ReflectedPerson) other;
-      return name.equals(that.name)&& age == that.age && siblingnames.equals(that.siblingnames); 
+      return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames);
     }
   }
-  
+
   @Test
   public void testGetDetachedValue_ReflectAvroType() {
     AvroType<ReflectedPerson> reflectType = Avros.reflects(ReflectedPerson.class);
+    reflectType.initialize(new Configuration());
     ReflectedPerson rp = new ReflectedPerson();
     rp.name = "josh";
     rp.age = 32;
@@ -209,6 +220,7 @@ public class AvroTypeTest {
     Person person = createPerson();
     AvroType<Pair<Integer, Person>> pairType = Avros.pairs(Avros.ints(),
         Avros.records(Person.class));
+    pairType.initialize(new Configuration());
 
     Pair<Integer, Person> inputPair = Pair.of(1, person);
     Pair<Integer, Person> detachedPair = pairType.getDetachedValue(inputPair);
@@ -216,47 +228,50 @@ public class AvroTypeTest {
     assertEquals(inputPair, detachedPair);
     assertNotSame(inputPair.second(), detachedPair.second());
   }
-  
+
   @Test
-  public void testGetDetachedValue_Collection(){
+  public void testGetDetachedValue_Collection() {
     Person person = createPerson();
     List<Person> personList = Lists.newArrayList(person);
-    
+
     AvroType<Collection<Person>> collectionType = Avros.collections(Avros.records(Person.class));
-    
+    collectionType.initialize(new Configuration());
+
     Collection<Person> detachedCollection = collectionType.getDetachedValue(personList);
-    
+
     assertEquals(personList, detachedCollection);
     Person detachedPerson = detachedCollection.iterator().next();
-    
+
     assertNotSame(person, detachedPerson);
   }
-  
+
   @Test
-  public void testGetDetachedValue_Map(){
+  public void testGetDetachedValue_Map() {
     String key = "key";
     Person value = createPerson();
-    
-    Map<String,Person> stringPersonMap = Maps.newHashMap();
+
+    Map<String, Person> stringPersonMap = Maps.newHashMap();
     stringPersonMap.put(key, value);
-    
+
     AvroType<Map<String, Person>> mapType = Avros.maps(Avros.records(Person.class));
-    
+    mapType.initialize(new Configuration());
+
     Map<String, Person> detachedMap = mapType.getDetachedValue(stringPersonMap);
-    
+
     assertEquals(stringPersonMap, detachedMap);
     assertNotSame(value, detachedMap.get(key));
   }
-  
+
   @Test
-  public void testGetDetachedValue_TupleN(){
+  public void testGetDetachedValue_TupleN() {
     Person person = createPerson();
     AvroType<TupleN> ptype = Avros.tuples(Avros.records(Person.class));
+    ptype.initialize(new Configuration());
     TupleN tuple = new TupleN(person);
     TupleN detachedTuple = ptype.getDetachedValue(tuple);
-    
+
     assertEquals(tuple, detachedTuple);
     assertNotSame(person, detachedTuple.get(0));
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
index a691df2..eae8218 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java
@@ -17,30 +17,29 @@
  */
 package org.apache.crunch.types.writable;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 
 import org.apache.hadoop.io.Text;
 import org.junit.Before;
 import org.junit.Test;
 
-
 public class WritableDeepCopierTest {
 
   private WritableDeepCopier<Text> deepCopier;
-  
+
   @Before
-  public void setUp(){
+  public void setUp() {
     deepCopier = new WritableDeepCopier<Text>(Text.class);
   }
-  
+
   @Test
-  public void testDeepCopy(){
+  public void testDeepCopy() {
     Text text = new Text("value");
     Text deepCopy = deepCopier.deepCopy(text);
-    
+
     assertEquals(text, deepCopy);
     assertNotSame(text, deepCopy);
   }
-  
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
index 1699f3c..f6c201b 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
@@ -41,7 +42,7 @@ public class WritableGroupedTableTypeTest {
 
     PGroupedTableType<Integer, Text> groupedTableType = Writables.tableOf(Writables.ints(),
         Writables.writables(Text.class)).getGroupedTableType();
-    groupedTableType.initialize();
+    groupedTableType.initialize(new Configuration());
 
     Pair<Integer, Iterable<Text>> detachedPair = groupedTableType.getDetachedValue(pair);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
index ae68e7a..697a28c 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 
 import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
@@ -33,8 +34,9 @@ public class WritableTableTypeTest {
     Text textValue = new Text("forty-two");
     Pair<Integer, Text> pair = Pair.of(integerValue, textValue);
 
-    WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class));
-    tableType.initialize();
+    WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(),
+        Writables.writables(Text.class));
+    tableType.initialize(new Configuration());
     Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair);
 
     assertSame(integerValue, detachedPair.first());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
index bea953d..65e946b 100644
--- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
@@ -46,7 +47,7 @@ public class WritableTypeTest {
   @Test
   public void testGetDetachedValue_CustomWritable() {
     WritableType<Text, Text> textWritableType = Writables.writables(Text.class);
-    textWritableType.initialize();
+    textWritableType.initialize(new Configuration());
     Text value = new Text("test");
 
     Text detachedValue = textWritableType.getDetachedValue(value);
@@ -57,9 +58,9 @@ public class WritableTypeTest {
   @Test
   public void testGetDetachedValue_Collection() {
     Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
-    WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables.collections(Writables
-        .writables(Text.class));
-    ptype.initialize();
+    WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables
+        .collections(Writables.writables(Text.class));
+    ptype.initialize(new Configuration());
 
     Collection<Text> detachedCollection = ptype.getDetachedValue(textCollection);
     assertEquals(textCollection, detachedCollection);
@@ -69,9 +70,9 @@ public class WritableTypeTest {
   @Test
   public void testGetDetachedValue_Tuple() {
     Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two"));
-    WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(Writables.writables(Text.class),
-        Writables.writables(Text.class));
-    ptype.initialize();
+    WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(
+        Writables.writables(Text.class), Writables.writables(Text.class));
+    ptype.initialize(new Configuration());
 
     Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair);
     assertEquals(textPair, detachedPair);
@@ -84,8 +85,9 @@ public class WritableTypeTest {
     Map<String, Text> stringTextMap = Maps.newHashMap();
     stringTextMap.put("key", new Text("value"));
 
-    WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables.writables(Text.class));
-    ptype.initialize();
+    WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables
+        .writables(Text.class));
+    ptype.initialize(new Configuration());
     Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap);
 
     assertEquals(stringTextMap, detachedMap);


Mime
View raw message