crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [26/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:28 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
new file mode 100644
index 0000000..fc30eaf
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -0,0 +1,709 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.CollectionDeepCopier;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.MapDeepCopier;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypes;
+import org.apache.crunch.types.TupleDeepCopier;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Defines static methods that are analogous to the methods defined in
+ * {@link AvroTypeFamily} for convenient static importing.
+ * 
+ */
+public class Avros {
+
+  /**
+   * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are
+   * composed of a mix of specific and reflection-based schemas. This bit
+   * controls whether or not we allow Crunch jobs to be created that involve
+   * mixing specific and reflection-based schemas and can be overridden by the
+   * client developer.
+   */
+  public static final boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS;
+
+  static {
+    CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = AvroCapabilities.canDecodeSpecificSchemaWithReflectDatumReader();
+  }
+
+  /**
+   * The instance we use for generating reflected schemas. May be modified by
+   * clients (e.g., Scrunch.)
+   */
+  public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
+
+  /**
+   * The name of the configuration parameter that tracks which reflection
+   * factory to use.
+   */
+  public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
+
+  public static void configureReflectDataFactory(Configuration conf) {
+    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
+  }
+
+  public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
+    return (ReflectDataFactory) ReflectionUtils.newInstance(
+        conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
+  }
+
+  public static void checkCombiningSpecificAndReflectionSchemas() {
+    if (!CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS) {
+      throw new IllegalStateException("Crunch does not support running jobs that"
+          + " contain a mixture of reflection-based and avro-generated data types."
+          + " Please consider turning your reflection-based type into an avro-generated"
+          + " type and using that generated type instead."
+          + " If the version of Avro you are using is 1.7.0 or greater, you can enable"
+          + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS" + " field to 'true'.");
+    }
+  }
+
+  public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
+    @Override
+    public String map(CharSequence input) {
+      return input.toString();
+    }
+  };
+
+  public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
+    @Override
+    public Utf8 map(String input) {
+      return new Utf8(input);
+    }
+  };
+
+  public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
+    @Override
+    public ByteBuffer map(Object input) {
+      if (input instanceof ByteBuffer) {
+        return (ByteBuffer) input;
+      }
+      return ByteBuffer.wrap((byte[]) input);
+    }
+  };
+
+  private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
+      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
+  private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
+  private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
+  private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
+  private static final AvroType<Float> floats = create(Float.class, Schema.Type.FLOAT);
+  private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
+  private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
+  private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
+      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
+
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
+      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
+      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
+  private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
+
+  public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
+    EXTENSIONS.put(clazz, ptype);
+  }
+
+  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+    return (PType<T>) PRIMITIVES.get(clazz);
+  }
+
+  static <T> boolean isPrimitive(AvroType<T> avroType) {
+    return avroType.getTypeClass().isPrimitive() || PRIMITIVES.containsKey(avroType.getTypeClass());
+  }
+
+  private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
+    return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
+  }
+
+  public static final AvroType<Void> nulls() {
+    return nulls;
+  }
+
+  public static final AvroType<String> strings() {
+    return strings;
+  }
+
+  public static final AvroType<Long> longs() {
+    return longs;
+  }
+
+  public static final AvroType<Integer> ints() {
+    return ints;
+  }
+
+  public static final AvroType<Float> floats() {
+    return floats;
+  }
+
+  public static final AvroType<Double> doubles() {
+    return doubles;
+  }
+
+  public static final AvroType<Boolean> booleans() {
+    return booleans;
+  }
+
+  public static final AvroType<ByteBuffer> bytes() {
+    return bytes;
+  }
+
+  public static final <T> AvroType<T> records(Class<T> clazz) {
+    if (EXTENSIONS.containsKey(clazz)) {
+      return (AvroType<T>) EXTENSIONS.get(clazz);
+    }
+    return containers(clazz);
+  }
+
+  public static final AvroType<GenericData.Record> generics(Schema schema) {
+    return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(
+        schema));
+  }
+
+  public static final <T> AvroType<T> containers(Class<T> clazz) {
+    if (SpecificRecord.class.isAssignableFrom(clazz)) {
+      return (AvroType<T>) specifics((Class<SpecificRecord>) clazz);
+    }
+    return reflects(clazz);
+  }
+
+  public static final <T extends SpecificRecord> AvroType<T> specifics(Class<T> clazz) {
+    T t = ReflectionUtils.newInstance(clazz, null);
+    Schema schema = t.getSchema();
+    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema));
+  }
+
+  public static final <T> AvroType<T> reflects(Class<T> clazz) {
+    Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz);
+    return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema));
+  }
+
+  private static class BytesToWritableMapFn<T extends Writable> extends MapFn<Object, T> {
+    private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
+
+    private final Class<T> writableClazz;
+
+    public BytesToWritableMapFn(Class<T> writableClazz) {
+      this.writableClazz = writableClazz;
+    }
+
+    @Override
+    public T map(Object input) {
+      ByteBuffer byteBuffer = BYTES_IN.map(input);
+      T instance = ReflectionUtils.newInstance(writableClazz, null);
+      try {
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(byteBuffer.array(),
+            byteBuffer.arrayOffset(), byteBuffer.limit())));
+      } catch (IOException e) {
+        LOG.error("Exception thrown reading instance of: " + writableClazz, e);
+      }
+      return instance;
+    }
+  }
+
+  private static class WritableToBytesMapFn<T extends Writable> extends MapFn<T, ByteBuffer> {
+    private static final Log LOG = LogFactory.getLog(WritableToBytesMapFn.class);
+
+    @Override
+    public ByteBuffer map(T input) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream das = new DataOutputStream(baos);
+      try {
+        input.write(das);
+      } catch (IOException e) {
+        LOG.error("Exception thrown converting Writable to bytes", e);
+      }
+      return ByteBuffer.wrap(baos.toByteArray());
+    }
+  }
+
+  public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
+    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
+        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
+  }
+
+  private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
+
+    private final MapFn<Object, T> mapFn;
+
+    public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public Collection<T> map(Object input) {
+      Collection<T> ret = Lists.newArrayList();
+      if (input instanceof Collection) {
+        for (Object in : (Collection<Object>) input) {
+          ret.add(mapFn.map(in));
+        }
+      } else {
+        // Assume it is an array
+        Object[] arr = (Object[]) input;
+        for (Object in : arr) {
+          ret.add(mapFn.map(in));
+        }
+      }
+      return ret;
+    }
+  }
+
+  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
+
+    private final MapFn mapFn;
+    private final String jsonSchema;
+    private transient Schema schema;
+
+    public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
+      this.mapFn = mapFn;
+      this.jsonSchema = schema.toString();
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public GenericData.Array<?> map(Collection<?> input) {
+      if (schema == null) {
+        schema = new Schema.Parser().parse(jsonSchema);
+      }
+      GenericData.Array array = new GenericData.Array(input.size(), schema);
+      for (Object in : input) {
+        array.add(mapFn.map(in));
+      }
+      return array;
+    }
+  }
+
+  public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
+    AvroType<T> avroType = (AvroType<T>) ptype;
+    Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
+    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
+    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
+    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype);
+  }
+
+  private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
+    private final MapFn<Object, T> mapFn;
+
+    public AvroMapToMap(MapFn<Object, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public Map<String, T> map(Map<CharSequence, Object> input) {
+      Map<String, T> out = Maps.newHashMap();
+      for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+      }
+      return out;
+    }
+  }
+
+  private static class MapToAvroMap<T> extends MapFn<Map<String, T>, Map<Utf8, Object>> {
+    private final MapFn<T, Object> mapFn;
+
+    public MapToAvroMap(MapFn<T, Object> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      this.mapFn.initialize();
+    }
+
+    @Override
+    public Map<Utf8, Object> map(Map<String, T> input) {
+      Map<Utf8, Object> out = Maps.newHashMap();
+      for (Map.Entry<String, T> e : input.entrySet()) {
+        out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
+      }
+      return out;
+    }
+  }
+
+  public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
+    AvroType<T> avroType = (AvroType<T>) ptype;
+    Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
+    AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
+    MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
+    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype);
+  }
+
+  private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
+    private final TupleFactory<?> tupleFactory;
+    private final List<MapFn> fns;
+
+    private transient Object[] values;
+
+    public GenericRecordToTuple(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
+      this.tupleFactory = tupleFactory;
+      this.fns = Lists.newArrayList();
+      for (PType<?> ptype : ptypes) {
+        AvroType atype = (AvroType) ptype;
+        fns.add(atype.getInputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+      this.values = new Object[fns.size()];
+      tupleFactory.initialize();
+    }
+
+    @Override
+    public Tuple map(GenericRecord input) {
+      for (int i = 0; i < values.length; i++) {
+        Object v = input.get(i);
+        if (v == null) {
+          values[i] = null;
+        } else {
+          values[i] = fns.get(i).map(v);
+        }
+      }
+      return tupleFactory.makeTuple(values);
+    }
+  }
+
+  private static class TupleToGenericRecord extends MapFn<Tuple, GenericRecord> {
+    private final List<MapFn> fns;
+    private final List<AvroType> avroTypes;
+    private final String jsonSchema;
+    private final boolean isReflect;
+    private transient Schema schema;
+
+    public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      this.avroTypes = Lists.newArrayList();
+      this.jsonSchema = schema.toString();
+      boolean reflectFound = false;
+      boolean specificFound = false;
+      for (PType ptype : ptypes) {
+        AvroType atype = (AvroType) ptype;
+        fns.add(atype.getOutputMapFn());
+        avroTypes.add(atype);
+        if (atype.hasReflect()) {
+          reflectFound = true;
+        }
+        if (atype.hasSpecific()) {
+          specificFound = true;
+        }
+      }
+      if (specificFound && reflectFound) {
+        checkCombiningSpecificAndReflectionSchemas();
+      }
+      this.isReflect = reflectFound;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+ 
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      this.schema = new Schema.Parser().parse(jsonSchema);
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    private GenericRecord createRecord() {
+      if (isReflect) {
+        return new ReflectGenericRecord(schema);
+      } else {
+        return new GenericData.Record(schema);
+      }
+    }
+
+    @Override
+    public GenericRecord map(Tuple input) {
+      GenericRecord record = createRecord();
+      for (int i = 0; i < input.size(); i++) {
+        Object v = input.get(i);
+        if (v == null) {
+          record.put(i, null);
+        } else {
+          record.put(i, fns.get(i).map(v));
+        }
+      }
+      return record;
+    }
+  }
+
+  public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
+    Schema schema = createTupleSchema(p1, p2);
+    GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
+    TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
+    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2);
+  }
+
+  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+    Schema schema = createTupleSchema(p1, p2, p3);
+    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
+        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3);
+  }
+
+  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
+      PType<V4> p4) {
+    Schema schema = createTupleSchema(p1, p2, p3, p4);
+    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
+        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2,
+        p3, p4);
+  }
+
+  public static final AvroType<TupleN> tuples(PType... ptypes) {
+    Schema schema = createTupleSchema(ptypes);
+    return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
+        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes);
+  }
+
+  public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
+    Schema schema = createTupleSchema(ptypes);
+    Class[] typeArgs = new Class[ptypes.length];
+    for (int i = 0; i < typeArgs.length; i++) {
+      typeArgs[i] = ptypes[i].getTypeClass();
+    }
+    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
+        ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
+  }
+
+  private static Schema createTupleSchema(PType<?>... ptypes) {
+    // Guarantee each tuple schema has a globally unique name
+    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (int i = 0; i < ptypes.length; i++) {
+      AvroType atype = (AvroType) ptypes[i];
+      Schema fieldSchema = allowNulls(atype.getSchema());
+      fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
+    }
+    schema.setFields(fields);
+    return schema;
+  }
+
+  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+      PType<S> base) {
+    AvroType<S> abase = (AvroType<S>) base;
+    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
+        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes()
+            .toArray(new PType[0]));
+  }
+
+  public static <T> PType<T> jsons(Class<T> clazz) {
+    return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
+  }
+
+  public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    if (key instanceof PTableType) {
+      PTableType ptt = (PTableType) key;
+      key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
+    }
+    if (value instanceof PTableType) {
+      PTableType ptt = (PTableType) value;
+      value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
+    }
+    AvroType<K> avroKey = (AvroType<K>) key;
+    AvroType<V> avroValue = (AvroType<V>) value;
+    return new AvroTableType(avroKey, avroValue, Pair.class);
+  }
+
+  private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+  private static Schema allowNulls(Schema base) {
+    if (NULL_SCHEMA.equals(base)) {
+      return base;
+    }
+    return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
+  }
+
+  private static class ReflectGenericRecord extends GenericData.Record {
+
+    public ReflectGenericRecord(Schema schema) {
+      super(schema);
+    }
+
+    @Override
+    public int hashCode() {
+      return reflectAwareHashCode(this, getSchema());
+    }
+  }
+
+  /*
+   * TODO: Remove this once we no longer have to support 1.5.4.
+   */
+  private static int reflectAwareHashCode(Object o, Schema s) {
+    if (o == null)
+      return 0; // incomplete datum
+    int hashCode = 1;
+    switch (s.getType()) {
+    case RECORD:
+      for (Schema.Field f : s.getFields()) {
+        if (f.order() == Schema.Field.Order.IGNORE)
+          continue;
+        hashCode = hashCodeAdd(hashCode, ReflectData.get().getField(o, f.name(), f.pos()), f.schema());
+      }
+      return hashCode;
+    case ARRAY:
+      Collection<?> a = (Collection<?>) o;
+      Schema elementType = s.getElementType();
+      for (Object e : a)
+        hashCode = hashCodeAdd(hashCode, e, elementType);
+      return hashCode;
+    case UNION:
+      return reflectAwareHashCode(o, s.getTypes().get(ReflectData.get().resolveUnion(s, o)));
+    case ENUM:
+      return s.getEnumOrdinal(o.toString());
+    case NULL:
+      return 0;
+    case STRING:
+      return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode();
+    default:
+      return o.hashCode();
+    }
+  }
+
+  /** Add the hash code for an object into an accumulated hash code. */
+  private static int hashCodeAdd(int hashCode, Object o, Schema s) {
+    return 31 * hashCode + reflectAwareHashCode(o, s);
+  }
+
+  private Avros() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
new file mode 100644
index 0000000..e973cca
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+/**
+ * A Factory class for constructing Avro reflection-related objects.
+ */
+public class ReflectDataFactory {
+
+  public ReflectData getReflectData() {
+    return ReflectData.AllowNull.get();
+  }
+
+  public <T> ReflectDatumReader<T> getReader(Schema schema) {
+    return new ReflectDatumReader<T>(schema);
+  }
+
+  public <T> ReflectDatumWriter<T> getWriter(Schema schema) {
+    return new ReflectDatumWriter<T>(schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
new file mode 100644
index 0000000..8bd18b0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
+class SafeAvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> {
+
+  public boolean accept(Class<?> c) {
+    return AvroWrapper.class.isAssignableFrom(c);
+  }
+
+  /**
+   * Returns the specified map output deserializer. Defaults to the final output
+   * deserializer if no map output schema was specified.
+   */
+  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
+    boolean isKey = AvroKey.class.isAssignableFrom(c);
+    Configuration conf = getConf();
+    Schema schema = isKey ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob
+        .getMapOutputSchema(conf));
+
+    DatumReader<T> datumReader = null;
+    if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
+      ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
+          conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
+      datumReader = factory.getReader(schema);
+    } else {
+      datumReader = new SpecificDatumReader<T>(schema);
+    }
+    return new AvroWrapperDeserializer(datumReader, isKey);
+  }
+
+  private static final DecoderFactory FACTORY = DecoderFactory.get();
+
+  private class AvroWrapperDeserializer implements Deserializer<AvroWrapper<T>> {
+
+    private DatumReader<T> reader;
+    private BinaryDecoder decoder;
+    private boolean isKey;
+
+    public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
+      this.reader = reader;
+      this.isKey = isKey;
+    }
+
+    public void open(InputStream in) {
+      this.decoder = FACTORY.directBinaryDecoder(in, decoder);
+    }
+
+    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper) throws IOException {
+      T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
+      if (wrapper == null) {
+        wrapper = isKey ? new AvroKey<T>(datum) : new AvroValue<T>(datum);
+      } else {
+        wrapper.datum(datum);
+      }
+      return wrapper;
+    }
+
+    public void close() throws IOException {
+      decoder.inputStream().close();
+    }
+  }
+
+  /** Returns the specified output serializer. */
+  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+    // AvroWrapper used for final output, AvroKey or AvroValue for map output
+    boolean isFinalOutput = c.equals(AvroWrapper.class);
+    Configuration conf = getConf();
+    Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
+        .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
+
+    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+    ReflectDatumWriter<T> writer = factory.getWriter(schema);
+    return new AvroWrapperSerializer(writer);
+  }
+
+  private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
+    private DatumWriter<T> writer;
+    private OutputStream out;
+    private BinaryEncoder encoder;
+
+    public AvroWrapperSerializer(DatumWriter<T> writer) {
+      this.writer = writer;
+    }
+
+    public void open(OutputStream out) {
+      this.out = out;
+      this.encoder = new EncoderFactory().configureBlockSize(512).binaryEncoder(out, null);
+    }
+
+    public void serialize(AvroWrapper<T> wrapper) throws IOException {
+      writer.write(wrapper.datum(), encoder);
+      // would be a lot faster if the Serializer interface had a flush()
+      // method and the Hadoop framework called it when needed rather
+      // than for every record.
+      encoder.flush();
+    }
+
+    public void close() throws IOException {
+      out.close();
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/package-info.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/package-info.java
new file mode 100644
index 0000000..abaf60f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Business object serialization using Apache Avro.
+ */
+package org.apache.crunch.types.avro;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/package-info.java b/crunch-core/src/main/java/org/apache/crunch/types/package-info.java
new file mode 100644
index 0000000..b420b03
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common functionality for business object serialization.
+ */
+package org.apache.crunch.types;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
new file mode 100644
index 0000000..8b54008
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A {@link Writable} for marshalling/unmarshalling Collections. Note that
+ * element order is <em>undefined</em>!
+ *
+ * @param <T> The value type
+ */
+class GenericArrayWritable<T> implements Writable {
+  private Writable[] values;
+  private Class<? extends Writable> valueClass;
+
+  public GenericArrayWritable(Class<? extends Writable> valueClass) {
+    this.valueClass = valueClass;
+  }
+
+  public GenericArrayWritable() {
+    // for deserialization
+  }
+
+  public void set(Writable[] values) {
+    this.values = values;
+  }
+
+  public Writable[] get() {
+    return values;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    values = new Writable[WritableUtils.readVInt(in)]; // construct values
+    if (values.length > 0) {
+      int nulls = WritableUtils.readVInt(in);
+      if (nulls == values.length) {
+        return;
+      }
+      String valueType = Text.readString(in);
+      setValueType(valueType);
+      for (int i = 0; i < values.length - nulls; i++) {
+        Writable value = WritableFactories.newInstance(valueClass);
+        value.readFields(in); // read a value
+        values[i] = value; // store it in values
+      }
+    }
+  }
+
+  protected void setValueType(String valueType) {
+    if (valueClass == null) {
+      try {
+        valueClass = Class.forName(valueType).asSubclass(Writable.class);
+      } catch (ClassNotFoundException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    } else if (!valueType.equals(valueClass.getName())) {
+      throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    if (values.length > 0) {
+      int nulls = 0;
+      for (int i = 0; i < values.length; i++) {
+        if (values[i] == null) {
+          nulls++;
+        }
+      }
+      WritableUtils.writeVInt(out, nulls);
+      if (values.length - nulls > 0) {
+        if (valueClass == null) {
+          throw new IllegalStateException("Value class not set by constructor or read");
+        }
+        Text.writeString(out, valueClass.getName());
+        for (int i = 0; i < values.length; i++) {
+          if (values[i] != null) {
+            values[i].write(out);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(values).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    GenericArrayWritable other = (GenericArrayWritable) obj;
+    if (!Arrays.equals(values, other.values))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
new file mode 100644
index 0000000..1ab51df
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Maps;
+
+class TextMapWritable<T extends Writable> implements Writable {
+
+  private Class<T> valueClazz;
+  private final Map<Text, T> instance;
+
+  public TextMapWritable() {
+    this.instance = Maps.newHashMap();
+  }
+
+  public TextMapWritable(Class<T> valueClazz) {
+    this.valueClazz = valueClazz;
+    this.instance = Maps.newHashMap();
+  }
+
+  public void put(Text txt, T value) {
+    instance.put(txt, value);
+  }
+
+  public Set<Map.Entry<Text, T>> entrySet() {
+    return instance.entrySet();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    instance.clear();
+    try {
+      this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
+    } catch (ClassNotFoundException e) {
+      throw (IOException) new IOException("Failed map init").initCause(e);
+    }
+    int entries = WritableUtils.readVInt(in);
+    try {
+      for (int i = 0; i < entries; i++) {
+        Text txt = new Text();
+        txt.readFields(in);
+        T value = valueClazz.newInstance();
+        value.readFields(in);
+        instance.put(txt, value);
+      }
+    } catch (IllegalAccessException e) {
+      throw (IOException) new IOException("Failed map init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException) new IOException("Failed map init").initCause(e);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, valueClazz.getName());
+    WritableUtils.writeVInt(out, instance.size());
+    for (Map.Entry<Text, T> e : instance.entrySet()) {
+      e.getKey().write(out);
+      e.getValue().write(out);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
new file mode 100644
index 0000000..1c3536b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A straight copy of the TupleWritable implementation in the join package,
+ * added here because of its package visibility restrictions.
+ * 
+ */
+public class TupleWritable implements WritableComparable<TupleWritable> {
+
+  private long written;
+  private Writable[] values;
+
+  /**
+   * Create an empty tuple with no allocated storage for writables.
+   */
+  public TupleWritable() {
+  }
+
+  /**
+   * Initialize tuple with storage; unknown whether any of them contain
+   * &quot;written&quot; values.
+   */
+  public TupleWritable(Writable[] vals) {
+    written = 0L;
+    values = vals;
+  }
+
+  /**
+   * Return true if tuple has an element at the position provided.
+   */
+  public boolean has(int i) {
+    return 0 != ((1 << i) & written);
+  }
+
+  /**
+   * Get ith Writable from Tuple.
+   */
+  public Writable get(int i) {
+    return values[i];
+  }
+
+  /**
+   * The number of children in this Tuple.
+   */
+  public int size() {
+    return values.length;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean equals(Object other) {
+    if (other instanceof TupleWritable) {
+      TupleWritable that = (TupleWritable) other;
+      if (this.size() != that.size() || this.written != that.written) {
+        return false;
+      }
+      for (int i = 0; i < values.length; ++i) {
+        if (!has(i))
+          continue;
+        if (!values[i].equals(that.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.append(written);
+    for (Writable v : values) {
+      builder.append(v);
+    }
+    return builder.toHashCode();
+  }
+
+  /**
+   * Convert Tuple to String as in the following.
+   * <tt>[<child1>,<child2>,...,<childn>]</tt>
+   */
+  public String toString() {
+    StringBuffer buf = new StringBuffer("[");
+    for (int i = 0; i < values.length; ++i) {
+      buf.append(has(i) ? values[i].toString() : "");
+      buf.append(",");
+    }
+    if (values.length != 0)
+      buf.setCharAt(buf.length() - 1, ']');
+    else
+      buf.append(']');
+    return buf.toString();
+  }
+
+  /**
+   * Writes each Writable to <code>out</code>. TupleWritable format:
+   * {@code
+   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
+   * }
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, values.length);
+    WritableUtils.writeVLong(out, written);
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        Text.writeString(out, values[i].getClass().getName());
+      }
+    }
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i)) {
+        values[i].write(out);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked")
+  // No static typeinfo on Tuples
+  public void readFields(DataInput in) throws IOException {
+    int card = WritableUtils.readVInt(in);
+    values = new Writable[card];
+    written = WritableUtils.readVLong(in);
+    Class<? extends Writable>[] cls = new Class[card];
+    try {
+      for (int i = 0; i < card; ++i) {
+        if (has(i)) {
+          cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
+        }
+      }
+      for (int i = 0; i < card; ++i) {
+        if (has(i)) {
+          values[i] = cls[i].newInstance();
+          values[i].readFields(in);
+        }
+      }
+    } catch (ClassNotFoundException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    } catch (IllegalAccessException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    } catch (InstantiationException e) {
+      throw (IOException) new IOException("Failed tuple init").initCause(e);
+    }
+  }
+
+  /**
+   * Record that the tuple contains an element at the position provided.
+   */
+  public void setWritten(int i) {
+    written |= 1 << i;
+  }
+
+  /**
+   * Record that the tuple does not contain an element at the position provided.
+   */
+  public void clearWritten(int i) {
+    written &= -1 ^ (1 << i);
+  }
+
+  /**
+   * Clear any record of which writables have been written to, without releasing
+   * storage.
+   */
+  public void clearWritten() {
+    written = 0L;
+  }
+
+  @Override
+  public int compareTo(TupleWritable o) {
+    for (int i = 0; i < values.length; ++i) {
+      if (has(i) && !o.has(i)) {
+        return 1;
+      } else if (!has(i) && o.has(i)) {
+        return -1;
+      } else {
+        Writable v1 = values[i];
+        Writable v2 = o.values[i];
+        if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+            int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+            if (cmp != 0) {
+              return cmp;
+            }
+          } else {
+            int cmp = v1.hashCode() - v2.hashCode();
+            if (cmp != 0) {
+              return cmp;
+            }
+          }
+        }
+      }
+    }
+    return values.length - o.values.length;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
new file mode 100644
index 0000000..7b6e11b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Performs deep copies of Writable values.
+ * 
+ * @param <T> The type of Writable that can be copied
+ */
+public class WritableDeepCopier<T extends Writable> implements DeepCopier<T> {
+
+  private Class<T> writableClass;
+
+  public WritableDeepCopier(Class<T> writableClass) {
+    this.writableClass = writableClass;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  @Override
+  public T deepCopy(T source) {
+    
+    if (source == null) {
+      return null;
+    }
+    
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+    T copiedValue = null;
+    try {
+      source.write(dataOut);
+      dataOut.flush();
+      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+      DataInput dataInput = new DataInputStream(byteInStream);
+      copiedValue = writableClass.newInstance();
+      copiedValue.readFields(dataInput);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying " + source, e);
+    }
+    return copiedValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
new file mode 100644
index 0000000..84318d3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
+
+  private final MapFn inputFn;
+  private final MapFn outputFn;
+  private final Converter converter;
+
+  public WritableGroupedTableType(WritableTableType<K, V> tableType) {
+    super(tableType);
+    WritableType keyType = (WritableType) tableType.getKeyType();
+    WritableType valueType = (WritableType) tableType.getValueType();
+    this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
+    this.outputFn = tableType.getOutputMapFn();
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
+        valueType.getSerializationClass());
+  }
+
+  @Override
+  public Class<Pair<K, Iterable<V>>> getTypeClass() {
+    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
+  }
+
+  @Override
+  public Converter getGroupingConverter() {
+    return converter;
+  }
+
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.tableType.initialize(conf);
+  }
+
+  @Override
+  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+    return PTables.getGroupedDetachedValue(this, value);
+  }
+
+  @Override
+  public void configureShuffle(Job job, GroupingOptions options) {
+    if (options != null) {
+      options.configure(job);
+    }
+    WritableType keyType = (WritableType) tableType.getKeyType();
+    WritableType valueType = (WritableType) tableType.getValueType();
+    job.setMapOutputKeyClass(keyType.getSerializationClass());
+    job.setMapOutputValueClass(valueType.getSerializationClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
new file mode 100644
index 0000000..2db0238
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.Converter;
+
+class WritablePairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
+
+  private final Class<K> keyClass;
+  private final Class<V> valueClass;
+
+  public WritablePairConverter(Class<K> keyClass, Class<V> valueClass) {
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+  }
+
+  @Override
+  public Pair<K, V> convertInput(K key, V value) {
+    return Pair.of(key, value);
+  }
+
+  @Override
+  public K outputKey(Pair<K, V> value) {
+    return value.first();
+  }
+
+  @Override
+  public V outputValue(Pair<K, V> value) {
+    return value.second();
+  }
+
+  @Override
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  @Override
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) {
+    return Pair.of(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
new file mode 100644
index 0000000..93e0fd6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.ImmutableList;
+
+class WritableTableType<K, V> implements PTableType<K, V> {
+
+  private final WritableType<K, Writable> keyType;
+  private final WritableType<V, Writable> valueType;
+  private final MapFn inputFn;
+  private final MapFn outputFn;
+  private final Converter converter;
+
+  public WritableTableType(WritableType<K, Writable> keyType, WritableType<V, Writable> valueType) {
+    this.keyType = keyType;
+    this.valueType = valueType;
+    this.inputFn = new PairMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
+    this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
+        valueType.getSerializationClass());
+  }
+
+  @Override
+  public Class<Pair<K, V>> getTypeClass() {
+    return (Class<Pair<K, V>>) Pair.of(null, null).getClass();
+  }
+
+  @Override
+  public List<PType> getSubTypes() {
+    return ImmutableList.<PType> of(keyType, valueType);
+  }
+
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+
+  @Override
+  public Converter getConverter() {
+    return converter;
+  }
+
+  @Override
+  public PTypeFamily getFamily() {
+    return WritableTypeFamily.getInstance();
+  }
+
+  public PType<K> getKeyType() {
+    return keyType;
+  }
+
+  public PType<V> getValueType() {
+    return valueType;
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return new WritableGroupedTableType<K, V>(this);
+  }
+
+  @Override
+  public ReadableSourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
+    return new SeqFileTableSourceTarget<K, V>(path, this);
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    keyType.initialize(conf);
+    valueType.initialize(conf);
+  }
+
+  @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof WritableTableType)) {
+      return false;
+    }
+    WritableTableType that = (WritableTableType) obj;
+    return keyType.equals(that.keyType) && valueType.equals(that.valueType);
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(keyType).append(valueType).toHashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
new file mode 100644
index 0000000..734946c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.ImmutableList;
+
+public class WritableType<T, W extends Writable> implements PType<T> {
+
+  private final Class<T> typeClass;
+  private final Class<W> writableClass;
+  private final Converter converter;
+  private final MapFn<W, T> inputFn;
+  private final MapFn<T, W> outputFn;
+  private final DeepCopier<W> deepCopier;
+  private final List<PType> subTypes;
+  private boolean initialized = false;
+
+  public WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn,
+      MapFn<T, W> outputDoFn, PType... subTypes) {
+    this.typeClass = typeClass;
+    this.writableClass = writableClass;
+    this.inputFn = inputDoFn;
+    this.outputFn = outputDoFn;
+    this.converter = new WritableValueConverter(writableClass);
+    this.deepCopier = new WritableDeepCopier<W>(writableClass);
+    this.subTypes = ImmutableList.<PType> builder().add(subTypes).build();
+  }
+
+  @Override
+  public PTypeFamily getFamily() {
+    return WritableTypeFamily.getInstance();
+  }
+
+  @Override
+  public Class<T> getTypeClass() {
+    return typeClass;
+  }
+
+  @Override
+  public Converter getConverter() {
+    return converter;
+  }
+
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+
+  @Override
+  public List<PType> getSubTypes() {
+    return subTypes;
+  }
+
+  public Class<W> getSerializationClass() {
+    return writableClass;
+  }
+
+  @Override
+  public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
+    return new SeqFileSourceTarget<T>(path, this);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof WritableType)) {
+      return false;
+    }
+    WritableType wt = (WritableType) obj;
+    return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes
+        .equals(wt.subTypes));
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.inputFn.initialize();
+    this.outputFn.initialize();
+    for (PType subType : subTypes) {
+      subType.initialize(conf);
+    }
+    this.initialized = true;
+  }
+
+  @Override
+  public T getDetachedValue(T value) {
+    if (!initialized) {
+      throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
+    }
+    W writableValue = outputFn.map(value);
+    W deepCopy = this.deepCopier.deepCopy(writableValue);
+    return inputFn.map(deepCopy);
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    hcb.append(typeClass).append(writableClass).append(subTypes);
+    return hcb.toHashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
new file mode 100644
index 0000000..a94db96
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypeUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The {@link Writable}-based implementation of the
+ * {@link org.apache.crunch.types.PTypeFamily} interface.
+ */
+public class WritableTypeFamily implements PTypeFamily {
+
+  private static final WritableTypeFamily INSTANCE = new WritableTypeFamily();
+
+  public static WritableTypeFamily getInstance() {
+    return INSTANCE;
+  }
+
+  // Disallow construction
+  private WritableTypeFamily() {
+  }
+
+  public PType<Void> nulls() {
+    return Writables.nulls();
+  }
+
+  public PType<String> strings() {
+    return Writables.strings();
+  }
+
+  public PType<Long> longs() {
+    return Writables.longs();
+  }
+
+  public PType<Integer> ints() {
+    return Writables.ints();
+  }
+
+  public PType<Float> floats() {
+    return Writables.floats();
+  }
+
+  public PType<Double> doubles() {
+    return Writables.doubles();
+  }
+
+  public PType<Boolean> booleans() {
+    return Writables.booleans();
+  }
+
+  public PType<ByteBuffer> bytes() {
+    return Writables.bytes();
+  }
+
+  public <T> PType<T> records(Class<T> clazz) {
+    return Writables.records(clazz);
+  }
+
+  public <W extends Writable> PType<W> writables(Class<W> clazz) {
+    return Writables.writables(clazz);
+  }
+
+  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    return Writables.tableOf(key, value);
+  }
+
+  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
+    return Writables.pairs(p1, p2);
+  }
+
+  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+    return Writables.triples(p1, p2, p3);
+  }
+
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+    return Writables.quads(p1, p2, p3, p4);
+  }
+
+  public PType<TupleN> tuples(PType<?>... ptypes) {
+    return Writables.tuples(ptypes);
+  }
+
+  public <T> PType<Collection<T>> collections(PType<T> ptype) {
+    return Writables.collections(ptype);
+  }
+
+  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
+    return Writables.maps(ptype);
+  }
+
+  @Override
+  public <T> PType<T> as(PType<T> ptype) {
+    if (ptype instanceof WritableType || ptype instanceof WritableTableType
+        || ptype instanceof WritableGroupedTableType) {
+      return ptype;
+    }
+    if (ptype instanceof PGroupedTableType) {
+      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
+      return new WritableGroupedTableType((WritableTableType) as(ptt));
+    }
+    PType<T> prim = Writables.getPrimitiveType(ptype.getTypeClass());
+    if (prim != null) {
+      return prim;
+    }
+    return PTypeUtils.convert(ptype, this);
+  }
+
+  @Override
+  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
+    return Writables.tuples(clazz, ptypes);
+  }
+
+  @Override
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
+    return Writables.derived(clazz, inputFn, outputFn, base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
new file mode 100644
index 0000000..3670b90
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.crunch.types.Converter;
+import org.apache.hadoop.io.NullWritable;
+
+class WritableValueConverter<W> implements Converter<Object, W, W, Iterable<W>> {
+
+  private final Class<W> serializationClass;
+
+  public WritableValueConverter(Class<W> serializationClass) {
+    this.serializationClass = serializationClass;
+  }
+
+  @Override
+  public W convertInput(Object key, W value) {
+    return value;
+  }
+
+  @Override
+  public Object outputKey(W value) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public W outputValue(W value) {
+    return value;
+  }
+
+  @Override
+  public Class<Object> getKeyClass() {
+    return (Class<Object>) (Class<?>) NullWritable.class;
+  }
+
+  @Override
+  public Class<W> getValueClass() {
+    return serializationClass;
+  }
+
+  @Override
+  public Iterable<W> convertIterableInput(Object key, Iterable<W> value) {
+    return value;
+  }
+}
\ No newline at end of file


Mime
View raw message