incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: Add Ptype#getDetachedValue
Date Tue, 03 Jul 2012 20:45:00 GMT
Updated Branches:
  refs/heads/master a69bda8f5 -> 7397d98a5


Add Ptype#getDetachedValue

Add getDetachedValue to PType to allow creating deep copies of
values in reducer-based DoFns. A side-effect of this is that PType
now extends Serializable.

Also fixes the bug in Aggregate#collectValues that caused the same
value to be collected multiple times in the case of custom
Writables or AvroTypes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/7397d98a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/7397d98a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/7397d98a

Branch: refs/heads/master
Commit: 7397d98a59f64df3bf7fbda57e1ddcfdf37b9487
Parents: a69bda8
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Tue Jul 3 19:52:00 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Tue Jul 3 22:10:07 2012 +0200

----------------------------------------------------------------------
 .../crunch/impl/mr/run/CrunchRuntimeException.java |    4 +
 .../java/com/cloudera/crunch/lib/Aggregate.java    |    9 +-
 src/main/java/com/cloudera/crunch/lib/PTables.java |   55 ++++++-
 src/main/java/com/cloudera/crunch/types/PType.java |   36 +++-
 .../cloudera/crunch/types/avro/AvroDeepCopier.java |  134 +++++++++++++++
 .../crunch/types/avro/AvroGroupedTableType.java    |    6 +
 .../cloudera/crunch/types/avro/AvroTableType.java  |    6 +
 .../com/cloudera/crunch/types/avro/AvroType.java   |   28 +++-
 .../java/com/cloudera/crunch/types/avro/Avros.java |    4 +
 .../types/writable/WritableGroupedTableType.java   |    6 +
 .../crunch/types/writable/WritableTableType.java   |    6 +
 .../crunch/types/writable/WritableType.java        |   17 ++-
 .../cloudera/crunch/types/writable/Writables.java  |   40 ++++-
 .../com/cloudera/crunch/lib/AggregateTest.java     |  104 +++++++++++
 .../crunch/types/avro/AvroDeepCopierTest.java      |   58 +++++++
 .../types/avro/AvroGroupedTableTypeTest.java       |   41 +++++
 .../crunch/types/avro/AvroTableTypeTest.java       |   35 ++++
 .../cloudera/crunch/types/avro/AvroTypeTest.java   |   52 ++++++
 .../com/cloudera/crunch/types/avro/AvrosTest.java  |   20 ++-
 .../writable/WritableGroupedTableTypeTest.java     |   37 ++++
 .../types/writable/WritableTableTypeTest.java      |   30 ++++
 .../crunch/types/writable/WritableTypeTest.java    |   29 +++
 .../crunch/types/writable/WritablesTest.java       |   13 ++-
 23 files changed, 744 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
index d41a52e..68ef054 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
@@ -12,6 +12,10 @@ public class CrunchRuntimeException extends RuntimeException {
     super(e);
   }
   
+  public CrunchRuntimeException(String msg, Exception e) {
+    super(msg, e);
+  }
+
   public boolean wasLogged() {
     return logged;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Aggregate.java b/src/main/java/com/cloudera/crunch/lib/Aggregate.java
index 1d8ebc5..4a2ff25 100644
--- a/src/main/java/com/cloudera/crunch/lib/Aggregate.java
+++ b/src/main/java/com/cloudera/crunch/lib/Aggregate.java
@@ -224,9 +224,14 @@ public class Aggregate {
   
   public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
     PTypeFamily tf = collect.getTypeFamily();
+    final PType<V> valueType = collect.getValueType();
     return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() {
-      public Collection<V> map(Iterable<V> v) {
-        return Lists.newArrayList(v);
+          public Collection<V> map(Iterable<V> values) {
+            List<V> collected = Lists.newArrayList();
+            for (V value : values) {
+              collected.add(valueType.getDetachedValue(value));
+            }
+            return collected;
       }
     }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));  
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/PTables.java b/src/main/java/com/cloudera/crunch/lib/PTables.java
index 0c0f246..a68e5be 100644
--- a/src/main/java/com/cloudera/crunch/lib/PTables.java
+++ b/src/main/java/com/cloudera/crunch/lib/PTables.java
@@ -14,15 +14,23 @@
  */
 package com.cloudera.crunch.lib;
 
+import java.util.List;
+
+import org.apache.hadoop.thirdparty.guava.common.collect.Lists;
+
 import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.Emitter;
 import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PGroupedTable;
 import com.cloudera.crunch.PTable;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PGroupedTableType;
+import com.cloudera.crunch.types.PTableType;
+import com.cloudera.crunch.types.PType;
 
 /**
  * Methods for performing common operations on PTables.
- *
+ * 
  */
 public class PTables {
 
@@ -31,16 +39,55 @@ public class PTables {
       @Override
       public void process(Pair<K, V> input, Emitter<K> emitter) {
         emitter.emit(input.first());
-      } 
+      }
     }, ptable.getKeyType());
   }
-  
+
   public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
     return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
       @Override
       public void process(Pair<K, V> input, Emitter<V> emitter) {
         emitter.emit(input.second());
-      } 
+      }
     }, ptable.getValueType());
   }
+
+  /**
+   * Create a detached value for a table {@link Pair}.
+   * 
+   * @param tableType
+   *          The table type
+   * @param value
+   *          The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) {
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType()
+        .getDetachedValue(value.second()));
+  }
+
+  /**
+   * Created a detached value for a {@link PGroupedTable} value.
+   * 
+   * 
+   * @param groupedTableType
+   *          The grouped table type
+   * @param value
+   *          The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(
+      PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) {
+
+    PTableType<K, V> tableType = groupedTableType.getTableType();
+    List<V> detachedIterable = Lists.newArrayList();
+    PType<V> valueType = tableType.getValueType();
+    for (V v : value.second()) {
+      detachedIterable.add(valueType.getDetachedValue(v));
+    }
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
+        (Iterable<V>) detachedIterable);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/PType.java b/src/main/java/com/cloudera/crunch/types/PType.java
index af4ef1b..d6730a3 100644
--- a/src/main/java/com/cloudera/crunch/types/PType.java
+++ b/src/main/java/com/cloudera/crunch/types/PType.java
@@ -15,10 +15,12 @@
 
 package com.cloudera.crunch.types;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 
+import com.cloudera.crunch.DoFn;
 import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.PCollection;
 import com.cloudera.crunch.SourceTarget;
@@ -29,34 +31,50 @@ import com.cloudera.crunch.SourceTarget;
  * read/write data from/to HDFS. Every {@link PCollection} has an associated
  * {@code PType} that tells Crunch how to read/write data from that
  * {@code PCollection}.
- *
+ * 
  */
-public interface PType<T> {
+public interface PType<T> extends Serializable {
   /**
    * Returns the Java type represented by this {@code PType}.
    */
   Class<T> getTypeClass();
-  
+
   /**
    * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
    */
   PTypeFamily getFamily();
 
   MapFn<Object, T> getInputMapFn();
-  
+
   MapFn<T, Object> getOutputMapFn();
-  
+
   Converter getConverter();
-  
+
+  /**
+   * Returns a copy of a value (or the value itself) that can safely be
+   * retained.
+   * <p>
+   * This is useful when iterable values being processed in a DoFn (via a
+   * reducer) need to be held on to for more than the scope of a single
+   * iteration, as a reducer (and therefore also a DoFn that has an Iterable as
+   * input) re-use deserialized values. More information on object reuse is
+   * available in the {@link DoFn} class documentation.
+   * 
+   * @param value
+   *          The value to be deep-copied
+   * @return A deep copy of the input value
+   */
+  T getDetachedValue(T value);
+
   /**
    * Returns a {@code SourceTarget} that is able to read/write data using the
    * serialization format specified by this {@code PType}.
    */
   SourceTarget<T> getDefaultFileSource(Path path);
-  
+
   /**
-   * Returns the sub-types that make up this PType if it is a composite instance,
-   * such as a tuple.
+   * Returns the sub-types that make up this PType if it is a composite
+   * instance, such as a tuple.
    */
   List<PType> getSubTypes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java b/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java
new file mode 100644
index 0000000..86f1edb
--- /dev/null
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java
@@ -0,0 +1,134 @@
+package com.cloudera.crunch.types.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
+
+/**
+ * Performs deep copies of Avro-serializable objects.
+ * <p>
+ * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be
+ * a problem when running in a map-reduce context where each mapper/reducer is
+ * running in its own JVM, but it may well be a problem in any other kind of
+ * multi-threaded context.
+ */
+public abstract class AvroDeepCopier<T> implements Serializable {
+
+  private BinaryEncoder binaryEncoder;
+  private BinaryDecoder binaryDecoder;
+  protected DatumWriter<T> datumWriter;
+  protected DatumReader<T> datumReader;
+
+  protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) {
+    this.datumWriter = datumWriter;
+    this.datumReader = datumReader;
+  }
+
+  protected abstract T createCopyTarget();
+
+  /**
+   * Deep copier for Avro specific data objects.
+   */
+  public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
+
+    private Class<T> valueClass;
+
+    public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
+      super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema));
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    protected T createCopyTarget() {
+      return createNewInstance(valueClass);
+    }
+
+  }
+
+  /**
+   * Deep copier for Avro generic data objects.
+   */
+  public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
+
+    private Schema schema;
+
+    public AvroGenericDeepCopier(Schema schema) {
+      super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema));
+      this.schema = schema;
+    }
+
+    @Override
+    protected Record createCopyTarget() {
+      return new GenericData.Record(schema);
+    }
+  }
+
+  /**
+   * Deep copier for Avro reflect data objects.
+   */
+  public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
+    private Class<T> valueClass;
+
+    public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
+      super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema));
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    protected T createCopyTarget() {
+      return createNewInstance(valueClass);
+    }
+  }
+
+  /**
+   * Create a deep copy of an Avro value.
+   * 
+   * @param source
+   *          The value to be copied
+   * @return The deep copy of the value
+   */
+  public T deepCopy(T source) {
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
+    T target = createCopyTarget();
+    try {
+      datumWriter.write(source, binaryEncoder);
+      binaryEncoder.flush();
+      binaryDecoder = DecoderFactory.get()
+          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      datumReader.read(target, binaryDecoder);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+    }
+
+    return target;
+  }
+
+  protected T createNewInstance(Class<T> targetClass) {
+    try {
+      return targetClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new CrunchRuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java
index 091948e..0d53eaf 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java
@@ -27,6 +27,7 @@ import com.cloudera.crunch.GroupingOptions;
 import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.fn.PairMapFn;
+import com.cloudera.crunch.lib.PTables;
 import com.cloudera.crunch.types.Converter;
 import com.cloudera.crunch.types.PGroupedTableType;
 
@@ -71,6 +72,11 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   }
   
   @Override
+  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+    return PTables.getGroupedDetachedValue(this, value);
+  }
+
+  @Override
   public void configureShuffle(Job job, GroupingOptions options) {
     AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
     String schemaJson = att.getSchema().toString();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
index 8d71b7f..b31ecec 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration;
 
 import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.lib.PTables;
 import com.cloudera.crunch.types.PGroupedTableType;
 import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PType;
@@ -155,4 +156,9 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements
 	public PGroupedTableType<K, V> getGroupedTableType() {
 		return new AvroGroupedTableType<K, V>(this);
 	}
+
+  @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
index 3db00c0..0489fb6 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java
@@ -41,10 +41,12 @@ public class AvroType<T> implements PType<T> {
 	private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
 
 	private final Class<T> typeClass;
-	private final Schema schema;
+  private final String schemaString;
+  private transient Schema schema;
 	private final MapFn baseInputMapFn;
 	private final MapFn baseOutputMapFn;
 	private final List<PType> subTypes;
+  private AvroDeepCopier<T> deepCopier;
 
 	public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
 		this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
@@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> {
 			MapFn outputMapFn, PType... ptypes) {
 		this.typeClass = typeClass;
 		this.schema = Preconditions.checkNotNull(schema);
+    this.schemaString = schema.toString();
 		this.baseInputMapFn = inputMapFn;
 		this.baseOutputMapFn = outputMapFn;
 		this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
@@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> {
 	}
 
 	public Schema getSchema() {
+    if (schema == null) {
+      schema = new Schema.Parser().parse(schemaString);
+    }
 		return schema;
 	}
 
@@ -123,6 +129,26 @@ public class AvroType<T> implements PType<T> {
 		return new AvroFileSourceTarget<T>(path, this);
 	}
 
+  private AvroDeepCopier<T> getDeepCopier() {
+    if (deepCopier == null) {
+      if (isSpecific()) {
+        deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
+      } else if (isGeneric()) {
+        deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
+      } else {
+        deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema());
+      }
+    }
+    return deepCopier;
+  }
+
+  public T getDetachedValue(T value) {
+    if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) {
+      return getDeepCopier().deepCopy(value);
+    }
+    return value;
+  }
+
 	@Override
 	public boolean equals(Object other) {
 		if (other == null || !(other instanceof AvroType)) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/avro/Avros.java b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
index 59237fc..7b85145 100644
--- a/src/main/java/com/cloudera/crunch/types/avro/Avros.java
+++ b/src/main/java/com/cloudera/crunch/types/avro/Avros.java
@@ -142,6 +142,10 @@ public class Avros {
 		return (PType<T>) PRIMITIVES.get(clazz);
 	}
 
+  static <T> boolean isPrimitive(AvroType<T> avroType) {
+    return PRIMITIVES.containsKey(avroType.getTypeClass());
+  }
+
 	private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
 		return new AvroType<T>(clazz, Schema.create(schemaType));
 	}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java
index 87f222d..a93b7c2 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java
@@ -19,6 +19,7 @@ import org.apache.hadoop.mapreduce.Job;
 import com.cloudera.crunch.GroupingOptions;
 import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.lib.PTables;
 import com.cloudera.crunch.types.Converter;
 import com.cloudera.crunch.types.PGroupedTableType;
 
@@ -60,6 +61,11 @@ public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   }
   
   @Override
+  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+    return PTables.getGroupedDetachedValue(this, value);
+  }
+
+  @Override
   public void configureShuffle(Job job, GroupingOptions options) {
     if (options != null) {
       options.configure(job);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
index 376af48..0400e0c 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java
@@ -25,6 +25,7 @@ import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.SourceTarget;
 import com.cloudera.crunch.fn.PairMapFn;
 import com.cloudera.crunch.io.seq.SeqFileTableSourceTarget;
+import com.cloudera.crunch.lib.PTables;
 import com.cloudera.crunch.types.Converter;
 import com.cloudera.crunch.types.PGroupedTableType;
 import com.cloudera.crunch.types.PTableType;
@@ -101,6 +102,11 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   }
   
   @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
+
+  @Override
   public boolean equals(Object obj) {
 	if (obj == null || !(obj instanceof WritableTableType)) {
 	  return false;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
index 8031e90..ff9b441 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/WritableType.java
@@ -18,16 +18,18 @@ import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
 
 import com.cloudera.crunch.MapFn;
 import com.cloudera.crunch.SourceTarget;
+import com.cloudera.crunch.fn.IdentityFn;
 import com.cloudera.crunch.io.seq.SeqFileSourceTarget;
 import com.cloudera.crunch.types.Converter;
 import com.cloudera.crunch.types.PType;
 import com.cloudera.crunch.types.PTypeFamily;
 import com.google.common.collect.ImmutableList;
 
-public class WritableType<T, W> implements PType<T> {
+public class WritableType<T, W extends Writable> implements PType<T> {
 
   private final Class<T> typeClass;
   private final Class<W> writableClass;
@@ -95,6 +97,19 @@ public class WritableType<T, W> implements PType<T> {
 		subTypes.equals(wt.subTypes));
   }
   
+  // Unchecked warnings are suppressed because we know that W and T are the same
+  // type (due to the IdentityFn being used)
+  @SuppressWarnings("unchecked")
+  @Override
+  public T getDetachedValue(T value) {
+    if (this.inputFn.getClass().equals(IdentityFn.class)) {
+      W writableValue = (W) value;
+      return (T) Writables.deepCopy(writableValue, this.writableClass);
+    } else {
+      return value;
+    }
+  }
+
   @Override
   public int hashCode() {
 	HashCodeBuilder hcb = new HashCodeBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/writable/Writables.java b/src/main/java/com/cloudera/crunch/types/writable/Writables.java
index bb07833..2262946 100644
--- a/src/main/java/com/cloudera/crunch/types/writable/Writables.java
+++ b/src/main/java/com/cloudera/crunch/types/writable/Writables.java
@@ -14,6 +14,11 @@
  */
 package com.cloudera.crunch.types.writable;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
@@ -39,6 +44,7 @@ import com.cloudera.crunch.Tuple4;
 import com.cloudera.crunch.TupleN;
 import com.cloudera.crunch.fn.CompositeMapFn;
 import com.cloudera.crunch.fn.IdentityFn;
+import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
 import com.cloudera.crunch.types.PType;
 import com.cloudera.crunch.types.TupleFactory;
 import com.cloudera.crunch.util.PTypes;
@@ -207,7 +213,7 @@ public class Writables {
     return (PType<T>) PRIMITIVES.get(clazz);
   }
   
-  public static <T> void register(Class<T> clazz, WritableType<T, ?> ptype) {
+  public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
     EXTENSIONS.put(clazz, ptype);
   }
   
@@ -243,11 +249,11 @@ public class Writables {
     return bytes;
   }
   
-  public static final <T> WritableType<T, T> records(Class<T> clazz) {
+  public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
     if (EXTENSIONS.containsKey(clazz)) {
-      return (WritableType<T, T>) EXTENSIONS.get(clazz);
+      return (WritableType<T, W>) EXTENSIONS.get(clazz);
     }
-    return (WritableType<T, T>) writables(clazz.asSubclass(Writable.class));
+    return (WritableType<T, W>) writables(clazz.asSubclass(Writable.class));
   }
 
   public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) {
@@ -593,6 +599,32 @@ public class Writables {
     return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());  
   }
   
+  /**
+   * Perform a deep copy of a writable value.
+   * 
+   * @param value
+   *          The value to be copied
+   * @param writableClass
+   *          The Writable class of the value to be copied
+   * @return A fully detached deep copy of the input value
+   */
+  public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) {
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+    T copiedValue = null;
+    try {
+      value.write(dataOut);
+      dataOut.flush();
+      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+      DataInput dataInput = new DataInputStream(byteInStream);
+      copiedValue = writableClass.newInstance();
+      copiedValue.readFields(dataInput);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying " + value, e);
+    }
+    return copiedValue;
+  }
+
   // Not instantiable
   private Writables() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
index 18b9a68..dc8bf9a 100644
--- a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
+++ b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
@@ -19,8 +19,11 @@ import static com.cloudera.crunch.types.writable.Writables.tableOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
 
+import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 import com.cloudera.crunch.MapFn;
@@ -30,14 +33,17 @@ import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.Pipeline;
 import com.cloudera.crunch.impl.mem.MemPipeline;
 import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.test.Employee;
 import com.cloudera.crunch.test.FileHelper;
 import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PTypeFamily;
 import com.cloudera.crunch.types.avro.AvroTypeFamily;
 import com.cloudera.crunch.types.avro.Avros;
 import com.cloudera.crunch.types.writable.WritableTypeFamily;
+import com.cloudera.crunch.types.writable.Writables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 public class AggregateTest {
 
@@ -122,4 +128,102 @@ public class AggregateTest {
     PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
     assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
   }
+
+  @Test
+  public void testCollectValues_Writables() throws IOException {
+    Pipeline pipeline = new MRPipeline(AggregateTest.class);
+    Map<Integer, Collection<Text>> collectionMap = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(new MapStringToTextPair(),
+            Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
+        ).collectValues().materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")),
+        collectionMap.get(1));
+  }
+
+  @Test
+  public void testCollectValues_Avro() throws IOException {
+
+    MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
+    Pipeline pipeline = new MRPipeline(AggregateTest.class);
+    Map<Integer, Collection<Employee>> collectionMap = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(mapFn,
+            Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
+        .materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    Employee empC = mapFn.map("c").second();
+    Employee empD = mapFn.map("d").second();
+    Employee empA = mapFn.map("a").second();
+
+    assertEquals(Lists.newArrayList(empC, empD, empA),
+        collectionMap.get(1));
+  }
+
+  private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
+    @Override
+    public Pair<Integer, Text> map(String input) {
+      return Pair.of(1, new Text(input));
+    }
+  }
+
+  private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
+    @Override
+    public Pair<Integer, Employee> map(String input) {
+      Employee emp = new Employee();
+      emp.setName(input);
+      emp.setSalary(0);
+      emp.setDepartment("");
+      return Pair.of(1, emp);
+    }
+  }
+
+  public static class PojoText {
+    private String value;
+
+    public PojoText() {
+      this("");
+    }
+
+    public PojoText(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("PojoText<%s>", this.value);
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PojoText other = (PojoText) obj;
+      if (value == null) {
+        if (other.value != null)
+          return false;
+      } else if (!value.equals(other.value))
+        return false;
+      return true;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java
new file mode 100644
index 0000000..cce37cd
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java
@@ -0,0 +1,58 @@
+package com.cloudera.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.junit.Test;
+
+import com.cloudera.crunch.test.Person;
+import com.cloudera.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
+import com.google.common.collect.Lists;
+
+public class AvroDeepCopierTest {
+
+  @Test
+  public void testDeepCopySpecific() {
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+        .deepCopy(person);
+
+    assertEquals(person, deepCopyPerson);
+    assertNotSame(person, deepCopyPerson);
+  }
+
+  @Test
+  public void testDeepCopyGeneric() {
+    Record record = new Record(Person.SCHEMA$);
+    record.put("name", "John Doe");
+    record.put("age", 42);
+    record.put("siblingnames", Lists.newArrayList());
+
+    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
+        .deepCopy(record);
+
+    assertEquals(record, deepCopyRecord);
+    assertNotSame(record, deepCopyRecord);
+  }
+
+  @Test
+  public void testDeepCopyReflect() {
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class,
+        Person.SCHEMA$).deepCopy(person);
+
+    assertEquals(person, deepCopyPerson);
+    assertNotSame(person, deepCopyPerson);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java
new file mode 100644
index 0000000..134dd9d
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java
@@ -0,0 +1,41 @@
+package com.cloudera.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.test.Person;
+import com.cloudera.crunch.types.PGroupedTableType;
+import com.google.common.collect.Lists;
+
+public class AvroGroupedTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Iterable<Person> inputPersonIterable = Lists.newArrayList(person);
+    Pair<Integer, Iterable<Person>> pair = Pair.of(integerValue, inputPersonIterable);
+
+    PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(),
+        Avros.reflects(Person.class)).getGroupedTableType();
+
+    Pair<Integer, Iterable<Person>> detachedPair = groupedTableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    List<Person> personList = Lists.newArrayList(detachedPair.second());
+    assertEquals(inputPersonIterable, personList);
+    assertNotSame(person, personList.get(0));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java
new file mode 100644
index 0000000..867ee6f
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java
@@ -0,0 +1,35 @@
+package com.cloudera.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.test.Person;
+import com.google.common.collect.Lists;
+
+public class AvroTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Pair<Integer, Person> pair = Pair.of(integerValue, person);
+
+    AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(),
+        Avros.reflects(Person.class));
+
+    Pair<Integer, Person> detachedPair = tableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    assertEquals(person, detachedPair.second());
+    assertNotSame(person, detachedPair.second());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
index d2c2ab9..49760f4 100644
--- a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java
@@ -1,8 +1,14 @@
 package com.cloudera.crunch.types.avro;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.hadoop.thirdparty.guava.common.collect.Lists;
 import org.junit.Test;
 
 import com.cloudera.crunch.test.Person;
@@ -63,4 +69,50 @@ public class AvroTypeTest {
 				Avros.generics(Person.SCHEMA$)).isGeneric());
 	}
 
+  @Test
+  public void testGetDetachedValue_AlreadyMappedAvroType() {
+    Integer value = 42;
+    Integer detachedValue = Avros.ints().getDetachedValue(value);
+    assertSame(value, detachedValue);
+  }
+
+  @Test
+  public void testGetDetachedValue_GenericAvroType() {
+    AvroType<Record> genericType = Avros.generics(Person.SCHEMA$);
+    GenericData.Record record = new GenericData.Record(Person.SCHEMA$);
+    record.put("name", "name value");
+    record.put("age", 42);
+    record.put("siblingnames", Lists.newArrayList());
+
+    Record detachedRecord = genericType.getDetachedValue(record);
+    assertEquals(record, detachedRecord);
+    assertNotSame(record, detachedRecord);
+  }
+
+  @Test
+  public void testGetDetachedValue_SpecificAvroType() {
+    AvroType<Person> specificType = Avros.records(Person.class);
+    Person person = new Person();
+    person.setName("name value");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person detachedPerson = specificType.getDetachedValue(person);
+    assertEquals(person, detachedPerson);
+    assertNotSame(person, detachedPerson);
+  }
+
+  @Test
+  public void testGetDetachedValue_ReflectAvroType() {
+    AvroType<Person> reflectType = Avros.reflects(Person.class);
+    Person person = new Person();
+    person.setName("name value");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person detachedPerson = reflectType.getDetachedValue(person);
+    assertEquals(person, detachedPerson);
+    assertNotSame(person, detachedPerson);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
index c7e9908..8bdd084 100644
--- a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
+++ b/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
@@ -15,7 +15,9 @@
 package com.cloudera.crunch.types.avro;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -31,6 +33,7 @@ import com.cloudera.crunch.Pair;
 import com.cloudera.crunch.Tuple3;
 import com.cloudera.crunch.Tuple4;
 import com.cloudera.crunch.TupleN;
+import com.cloudera.crunch.test.Person;
 import com.cloudera.crunch.types.PTableType;
 import com.cloudera.crunch.types.PType;
 import com.google.common.collect.ImmutableList;
@@ -104,9 +107,9 @@ public class AvrosTest {
   
   @Test
   public void testNestedTables() throws Exception {
-	PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
-	String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString();
-	assertNotNull(schema);
+    PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
+    String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString();
+    assertNotNull(schema);
   }
   
   @Test
@@ -204,4 +207,15 @@ public class AvrosTest {
     assertEquals(java, ptype.getInputMapFn().map(avro));
     assertEquals(avro, ptype.getOutputMapFn().map(java));
   }
+
+  @Test
+  public void testIsPrimitive_True() {
+    assertTrue(Avros.isPrimitive(Avros.ints()));
+  }
+
+  @Test
+  public void testIsPrimitive_False() {
+    assertFalse(Avros.isPrimitive(Avros.reflects(Person.class)));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java
new file mode 100644
index 0000000..2357393
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java
@@ -0,0 +1,37 @@
+package com.cloudera.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.types.PGroupedTableType;
+import com.google.common.collect.Lists;
+
+public class WritableGroupedTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Text textValue = new Text("forty-two");
+    Iterable<Text> inputTextIterable = Lists.newArrayList(textValue);
+    Pair<Integer, Iterable<Text>> pair = Pair.of(integerValue, inputTextIterable);
+
+    PGroupedTableType<Integer, Text> groupedTableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
+        .getGroupedTableType();
+    
+    Pair<Integer, Iterable<Text>> detachedPair = groupedTableType.getDetachedValue(pair);
+    
+    assertSame(integerValue, detachedPair.first());
+    List<Text> textList = Lists.newArrayList(detachedPair.second());
+    assertEquals(inputTextIterable, textList);
+    assertNotSame(textValue, textList.get(0));
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java
new file mode 100644
index 0000000..96015f6
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java
@@ -0,0 +1,30 @@
+package com.cloudera.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.cloudera.crunch.Pair;
+
+public class WritableTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Text textValue = new Text("forty-two");
+    Pair<Integer, Text> pair = Pair.of(integerValue, textValue);
+
+    WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(),
+        Writables.writables(Text.class));
+
+    Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    assertEquals(textValue, detachedPair.second());
+    assertNotSame(textValue, detachedPair.second());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java
new file mode 100644
index 0000000..8dff574
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java
@@ -0,0 +1,29 @@
+package com.cloudera.crunch.types.writable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class WritableTypeTest {
+
+  @Test
+  public void testGetDetachedValue_AlreadyMappedWritable() {
+    WritableType<String, Text> stringType = Writables.strings();
+    String value = "test";
+    assertSame(value, stringType.getDetachedValue(value));
+  }
+
+  @Test
+  public void testGetDetachedValue_CustomWritable() {
+    WritableType<Text, Text> textWritableType = Writables.writables(Text.class);
+    Text value = new Text("test");
+
+    Text detachedValue = textWritableType.getDetachedValue(value);
+    assertEquals(value, detachedValue);
+    assertNotSame(value, detachedValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java
index ac0b3db..2be2c0b 100644
--- a/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java
+++ b/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java
@@ -16,7 +16,8 @@ package com.cloudera.crunch.types.writable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -254,7 +255,7 @@ public class WritablesTest {
   public void testRegister() throws Exception {
     WritableType<TestWritable, TestWritable> wt = Writables.writables(TestWritable.class);
     Writables.register(TestWritable.class, wt);
-    assertTrue(Writables.records(TestWritable.class) == wt);
+    assertSame(Writables.records(TestWritable.class), wt);
   }
     
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -264,4 +265,12 @@ public class WritablesTest {
     assertEquals(java, ptype.getInputMapFn().map(writable));
     assertEquals(writable, ptype.getOutputMapFn().map(java));
   }
+
+  @Test
+  public void testDeepCopy() {
+    Text text = new Text("Test");
+    Text copiedText = Writables.deepCopy(text, Text.class);
+    assertEquals(text, copiedText);
+    assertNotSame(text, copiedText);
+  }
 }


Mime
View raw message