crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [27/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:29 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PTypeUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeUtils.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeUtils.java
new file mode 100644
index 0000000..e61b98b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+/**
+ * Utilities for converting between {@code PType}s from different
+ * {@code PTypeFamily} implementations.
+ * 
+ */
+public class PTypeUtils {
+
+  public static <T> PType<T> convert(PType<T> ptype, PTypeFamily tf) {
+    if (ptype instanceof PTableType) {
+      PTableType ptt = (PTableType) ptype;
+      return tf.tableOf(tf.as(ptt.getKeyType()), tf.as(ptt.getValueType()));
+    }
+    Class<T> typeClass = ptype.getTypeClass();
+    if (Tuple.class.isAssignableFrom(typeClass)) {
+      List<PType> subTypes = ptype.getSubTypes();
+      if (Pair.class.equals(typeClass)) {
+        return tf.pairs(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)));
+      } else if (Tuple3.class.equals(typeClass)) {
+        return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)));
+      } else if (Tuple4.class.equals(typeClass)) {
+        return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
+      } else if (TupleN.class.equals(typeClass)) {
+        PType[] newPTypes = subTypes.toArray(new PType[0]);
+        for (int i = 0; i < newPTypes.length; i++) {
+          newPTypes[i] = tf.as(subTypes.get(i));
+        }
+        return (PType<T>) tf.tuples(newPTypes);
+      }
+    }
+    if (Collection.class.isAssignableFrom(typeClass)) {
+      return tf.collections(tf.as(ptype.getSubTypes().get(0)));
+    }
+    return tf.records(typeClass);
+  }
+
+  private PTypeUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
new file mode 100644
index 0000000..546719c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypes.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+/**
+ * Utility functions for creating common types of derived PTypes, e.g., for JSON
+ * data, protocol buffers, and Thrift records.
+ * 
+ */
+public class PTypes {
+
+  public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
+    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
+  }
+
+  public static PType<UUID> uuid(PTypeFamily ptf) {
+    return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes());
+  }
+  
+  public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
+    return typeFamily
+        .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
+  }
+
+  public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
+    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
+  }
+
+  public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
+    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
+  }
+
+  public static final <T extends Enum> PType<T> enums(final Class<T> type, PTypeFamily typeFamily) {
+    return typeFamily.derived(type, new EnumInputMapper<T>(type), new EnumOutputMapper<T>(), typeFamily.strings());
+  }
+
+  public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
+    public BigInteger map(ByteBuffer input) {
+      return input == null ? null : new BigInteger(input.array());
+    }
+  };
+
+  public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
+    public ByteBuffer map(BigInteger input) {
+      return input == null ? null : ByteBuffer.wrap(input.toByteArray());
+    }
+  };
+
+  private static class JacksonInputMapFn<T> extends MapFn<String, T> {
+
+    private final Class<T> clazz;
+    private transient ObjectMapper mapper;
+
+    public JacksonInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper();
+    }
+
+    @Override
+    public T map(String input) {
+      try {
+        return mapper.readValue(input, clazz);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  private static class JacksonOutputMapFn<T> extends MapFn<T, String> {
+
+    private transient ObjectMapper mapper;
+
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper();
+    }
+
+    @Override
+    public String map(T input) {
+      try {
+        return mapper.writeValueAsString(input);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  private static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
+
+    private final Class<T> clazz;
+    private transient T instance;
+
+    public ProtoInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.instance = Protos.getDefaultInstance(clazz);
+    }
+
+    @Override
+    public T map(ByteBuffer bb) {
+      try {
+        return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  private static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
+
+    public ProtoOutputMapFn() {
+    }
+
+    @Override
+    public ByteBuffer map(T proto) {
+      return ByteBuffer.wrap(proto.toByteArray());
+    }
+  }
+
+  private static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
+
+    private final Class<T> clazz;
+    private transient T instance;
+    private transient TDeserializer deserializer;
+    private transient byte[] bytes;
+
+    public ThriftInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.instance = ReflectionUtils.newInstance(clazz, null);
+      this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+      this.bytes = new byte[0];
+    }
+
+    @Override
+    public T map(ByteBuffer bb) {
+      T next = (T) instance.deepCopy();
+      int len = bb.limit() - bb.position();
+      if (len != bytes.length) {
+        bytes = new byte[len];
+      }
+      System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
+      try {
+        deserializer.deserialize(next, bytes);
+      } catch (TException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      return next;
+    }
+  }
+
+  private static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
+
+    private transient TSerializer serializer;
+
+    public ThriftOutputMapFn() {
+    }
+
+    @Override
+    public void initialize() {
+      this.serializer = new TSerializer(new TBinaryProtocol.Factory());
+    }
+
+    @Override
+    public ByteBuffer map(T t) {
+      try {
+        return ByteBuffer.wrap(serializer.serialize(t));
+      } catch (TException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  private static class EnumInputMapper<T extends Enum> extends MapFn<String, T> {
+    private final Class<T> type;
+
+    public EnumInputMapper(Class<T> type) {
+      this.type = type;
+    }
+
+    @Override
+    public T map(String input) {
+      return (T) Enum.valueOf(type, input);
+    }
+  };
+
+  private static class EnumOutputMapper<T extends Enum> extends MapFn<T, String> {
+
+    @Override
+    public String map(T input) {
+      return input.name();
+    }
+  };
+  
+  private static MapFn<ByteBuffer, UUID> BYTE_TO_UUID = new MapFn<ByteBuffer, UUID>() {
+    @Override
+    public UUID map(ByteBuffer input) {
+      return new UUID(input.getLong(), input.getLong());
+    }
+  };
+  
+  private static MapFn<UUID, ByteBuffer> UUID_TO_BYTE = new MapFn<UUID, ByteBuffer>() {
+    @Override
+    public ByteBuffer map(UUID input) {
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      bb.asLongBuffer().put(input.getMostSignificantBits()).put(input.getLeastSignificantBits());
+      return bb;
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/Protos.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/Protos.java b/crunch-core/src/main/java/org/apache/crunch/types/Protos.java
new file mode 100644
index 0000000..4cd5068
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/Protos.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Splitter;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+
+/**
+ * Utility functions for working with protocol buffers in Crunch.
+ */
+public class Protos {
+
+  /**
+   * Utility function for creating a default PB Messgae from a Class object that
+   * works with both protoc 2.3.0 and 2.4.x.
+   * @param clazz The class of the protocol buffer to create
+   * @return An instance of a protocol buffer
+   */
+  public static <M extends Message> M getDefaultInstance(Class<M> clazz) {
+    if (clazz.getConstructors().length > 0) {
+      // Protobuf 2.3.0
+      return ReflectionUtils.newInstance(clazz, null);
+    } else {
+      // Protobuf 2.4.x
+      try {
+        Message.Builder mb = (Message.Builder) clazz.getDeclaredMethod("newBuilder").invoke(null);
+        return (M) mb.getDefaultInstanceForType();
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }  
+    }
+  }
+  
+  public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) {
+    return new ExtractKeyFn<M, K>(fieldName);
+  }
+
+  public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) {
+    return new TextToProtoFn<M>(sep, msgClass);
+  }
+
+  private static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
+
+    private final String fieldName;
+
+    private transient FieldDescriptor fd;
+
+    public ExtractKeyFn(String fieldName) {
+      this.fieldName = fieldName;
+    }
+
+    @Override
+    public K map(M input) {
+      if (input == null) {
+        throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn");
+      } else if (fd == null) {
+        fd = input.getDescriptorForType().findFieldByName(fieldName);
+        if (fd == null) {
+          throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input);
+        }
+      }
+      return (K) input.getField(fd);
+    }
+
+  }
+
+  private static class TextToProtoFn<M extends Message> extends DoFn<String, M> {
+
+    private final String sep;
+    private final Class<M> msgClass;
+
+    private transient M msgInstance;
+    private transient List<FieldDescriptor> fields;
+    private transient Splitter splitter;
+
+    enum ParseErrors {
+      TOTAL,
+      NUMBER_FORMAT
+    };
+
+    public TextToProtoFn(String sep, Class<M> msgClass) {
+      this.sep = sep;
+      this.msgClass = msgClass;
+    }
+
+    @Override
+    public void initialize() {
+      this.msgInstance = getDefaultInstance(msgClass);
+      this.fields = msgInstance.getDescriptorForType().getFields();
+      this.splitter = Splitter.on(sep);
+    }
+
+    @Override
+    public void process(String input, Emitter<M> emitter) {
+      if (input != null && !input.isEmpty()) {
+        Builder b = msgInstance.newBuilderForType();
+        Iterator<String> iter = splitter.split(input).iterator();
+        boolean parseError = false;
+        for (FieldDescriptor fd : fields) {
+          if (iter.hasNext()) {
+            String value = iter.next();
+            if (value != null && !value.isEmpty()) {
+              Object parsedValue = null;
+              try {
+                switch (fd.getJavaType()) {
+                case STRING:
+                  parsedValue = value;
+                  break;
+                case INT:
+                  parsedValue = Integer.valueOf(value);
+                  break;
+                case LONG:
+                  parsedValue = Long.valueOf(value);
+                  break;
+                case FLOAT:
+                  parsedValue = Float.valueOf(value);
+                  break;
+                case DOUBLE:
+                  parsedValue = Double.valueOf(value);
+                  break;
+                case BOOLEAN:
+                  parsedValue = Boolean.valueOf(value);
+                  break;
+                case ENUM:
+                  parsedValue = fd.getEnumType().findValueByName(value);
+                  break;
+                }
+                b.setField(fd, parsedValue);
+              } catch (NumberFormatException nfe) {
+                increment(ParseErrors.NUMBER_FORMAT);
+                parseError = true;
+                break;
+              }
+            }
+          }
+        }
+
+        if (parseError) {
+          increment(ParseErrors.TOTAL);
+        } else {
+          emitter.emit((M) b.build());
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
new file mode 100644
index 0000000..a2ffae3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/TupleDeepCopier.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.List;
+
+import org.apache.crunch.Tuple;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Performs deep copies (based on underlying PType deep copying) of Tuple-based objects.
+ * 
+ * @param <T> The type of Tuple implementation being copied
+ */
+public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> {
+
+  private final TupleFactory<T> tupleFactory;
+  private final List<PType> elementTypes;
+
+  public TupleDeepCopier(Class<T> tupleClass, PType... elementTypes) {
+    tupleFactory = TupleFactory.getTupleFactory(tupleClass);
+    this.elementTypes = Lists.newArrayList(elementTypes);
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    for (PType elementType : elementTypes) {
+      elementType.initialize(conf);
+    }
+  }
+
+  @Override
+  public T deepCopy(T source) {
+    
+    if (source == null) {
+      return null;
+    }
+    
+    Object[] deepCopyValues = new Object[source.size()];
+
+    for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) {
+      PType elementType = elementTypes.get(valueIndex);
+      deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex));
+    }
+
+    return tupleFactory.makeTuple(deepCopyValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch-core/src/main/java/org/apache/crunch/types/TupleFactory.java
new file mode 100644
index 0000000..73b47de
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/TupleFactory.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+import com.google.common.collect.Maps;
+
+public abstract class TupleFactory<T extends Tuple> implements Serializable {
+
+  public void initialize() {
+  }
+
+  public abstract T makeTuple(Object... values);
+
+  
+  private static final Map<Class, TupleFactory> customTupleFactories = Maps.newHashMap();
+  
+  /**
+   * Get the {@link TupleFactory} for a given Tuple implementation.
+   * 
+   * @param tupleClass
+   *          The class for which the factory is to be retrieved
+   * @return The appropriate TupleFactory
+   */
+  public static <T extends Tuple> TupleFactory<T> getTupleFactory(Class<T> tupleClass) {
+    if (tupleClass == Pair.class) {
+      return (TupleFactory<T>) PAIR;
+    } else if (tupleClass == Tuple3.class) {
+      return (TupleFactory<T>) TUPLE3;
+    } else if (tupleClass == Tuple4.class) {
+      return (TupleFactory<T>) TUPLE4;
+    } else if (tupleClass == TupleN.class) {
+      return (TupleFactory<T>) TUPLEN;
+    } else if (customTupleFactories.containsKey(tupleClass)) {
+      return (TupleFactory<T>) customTupleFactories.get(tupleClass);
+    } else {
+      throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass);
+    }
+  }
+
+  public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
+    @Override
+    public Pair makeTuple(Object... values) {
+      return Pair.of(values[0], values[1]);
+    }
+  };
+
+  public static final TupleFactory<Tuple3> TUPLE3 = new TupleFactory<Tuple3>() {
+    @Override
+    public Tuple3 makeTuple(Object... values) {
+      return Tuple3.of(values[0], values[1], values[2]);
+    }
+  };
+
+  public static final TupleFactory<Tuple4> TUPLE4 = new TupleFactory<Tuple4>() {
+    @Override
+    public Tuple4 makeTuple(Object... values) {
+      return Tuple4.of(values[0], values[1], values[2], values[3]);
+    }
+  };
+
+  public static final TupleFactory<TupleN> TUPLEN = new TupleFactory<TupleN>() {
+    @Override
+    public TupleN makeTuple(Object... values) {
+      return new TupleN(values);
+    }
+  };
+
+  public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) {
+    if (customTupleFactories.containsKey(clazz)) {
+      return (TupleFactory<T>) customTupleFactories.get(clazz);
+    }
+    TupleFactory<T> custom = new CustomTupleFactory<T>(clazz, typeArgs);
+    customTupleFactories.put(clazz, custom);
+    return custom;
+  }
+
+  private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> {
+
+    private final Class<T> clazz;
+    private final Class[] typeArgs;
+
+    private transient Constructor<T> constructor;
+
+    public CustomTupleFactory(Class<T> clazz, Class[] typeArgs) {
+      this.clazz = clazz;
+      this.typeArgs = typeArgs;
+    }
+
+    @Override
+    public void initialize() {
+      try {
+        constructor = clazz.getConstructor(typeArgs);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+
+    @Override
+    public T makeTuple(Object... values) {
+      try {
+        return constructor.newInstance(values);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
new file mode 100644
index 0000000..cc1636c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Determines the capabilities of the Avro version that is currently being used.
+ */
+class AvroCapabilities {
+
+  public static class Record extends org.apache.avro.specific.SpecificRecordBase implements
+      org.apache.avro.specific.SpecificRecord {
+    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser()
+        .parse("{\"type\":\"record\",\"name\":\"Record\",\"namespace\":\"org.apache.crunch.types.avro\",\"fields\":[{\"name\":\"subrecords\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}");
+    @Deprecated
+    public java.util.List<java.lang.CharSequence> subrecords;
+
+    public java.lang.Object get(int field$) {
+      switch (field$) {
+      case 0:
+        return subrecords;
+      default:
+        throw new org.apache.avro.AvroRuntimeException("Bad index");
+      }
+    }
+
+    // Used by DatumReader. Applications should not call.
+    @SuppressWarnings(value = "unchecked")
+    public void put(int field$, java.lang.Object value$) {
+      switch (field$) {
+      case 0:
+        subrecords = (java.util.List<java.lang.CharSequence>) value$;
+        break;
+      default:
+        throw new org.apache.avro.AvroRuntimeException("Bad index");
+      }
+    }
+
+    @Override
+    public Schema getSchema() {
+      return SCHEMA$;
+    }
+  }
+
+  /**
+   * Determine if the current Avro version can use the ReflectDatumReader to
+   * read SpecificData that includes an array. The inability to do this was a
+   * bug that was fixed in Avro 1.7.0.
+   * 
+   * @return true if SpecificData can be properly read using a
+   *         ReflectDatumReader
+   */
+  static boolean canDecodeSpecificSchemaWithReflectDatumReader() {
+    ReflectDatumReader<Record> datumReader = new ReflectDatumReader(Record.SCHEMA$);
+    ReflectDatumWriter<Record> datumWriter = new ReflectDatumWriter(Record.SCHEMA$);
+
+    Record record = new Record();
+    record.subrecords = Lists.<CharSequence> newArrayList("a", "b");
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+
+    try {
+      datumWriter.write(record, encoder);
+      encoder.flush();
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
+          byteArrayOutputStream.toByteArray(), null);
+      datumReader.read(record, decoder);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Error performing specific schema test", ioe);
+    } catch (ClassCastException cce) {
+      // This indicates that we're using a pre-1.7.0 version of Avro, as the
+      // ReflectDatumReader in those versions could not correctly handle an
+      // array in a SpecificData value
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
new file mode 100644
index 0000000..0fe9288
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Performs deep copies of Avro-serializable objects.
+ * <p>
+ * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be a problem when
+ * running in a map-reduce context where each mapper/reducer is running in its own JVM, but it may
+ * well be a problem in any other kind of multi-threaded context.
+ */
+abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable {
+
+  private String jsonSchema;
+  private transient Configuration conf;
+  private transient Schema schema;
+  private BinaryEncoder binaryEncoder;
+  private BinaryDecoder binaryDecoder;
+
+  private transient DatumWriter<T> datumWriter;
+  private transient DatumReader<T> datumReader;
+
+  public AvroDeepCopier(Schema schema) {
+    this.jsonSchema = schema.toString();
+  }
+
+  protected Schema getSchema() {
+    if (schema == null) {
+      schema = new Schema.Parser().parse(jsonSchema);
+    }
+    return schema;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.conf = conf;
+  }
+
+  protected abstract T createCopyTarget();
+
+  protected abstract DatumWriter<T> createDatumWriter(Configuration conf);
+
+  protected abstract DatumReader<T> createDatumReader(Configuration conf);
+
+  /**
+   * Deep copier for Avro specific data objects.
+   */
+  public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
+
+    private Class<T> valueClass;
+
+    public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
+      super(schema);
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    protected T createCopyTarget() {
+      return createNewInstance(valueClass);
+    }
+
+    @Override
+    protected DatumWriter<T> createDatumWriter(Configuration conf) {
+      return new SpecificDatumWriter<T>(getSchema());
+    }
+
+    @Override
+    protected DatumReader<T> createDatumReader(Configuration conf) {
+      return new SpecificDatumReader<T>(getSchema());
+    }
+
+  }
+
+  /**
+   * Deep copier for Avro generic data objects.
+   */
+  public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
+
+    private transient Schema schema;
+
+    public AvroGenericDeepCopier(Schema schema) {
+      super(schema);
+    }
+
+    @Override
+    protected Record createCopyTarget() {
+      return new GenericData.Record(getSchema());
+    }
+
+    @Override
+    protected DatumReader<Record> createDatumReader(Configuration conf) {
+      return new GenericDatumReader<Record>(getSchema());
+    }
+
+    @Override
+    protected DatumWriter<Record> createDatumWriter(Configuration conf) {
+      return new GenericDatumWriter<Record>(getSchema());
+    }
+  }
+
+  /**
+   * Deep copier for Avro reflect data objects.
+   */
+  public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
+
+    private Class<T> valueClass;
+
+    public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
+      super(schema);
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    protected T createCopyTarget() {
+      return createNewInstance(valueClass);
+    }
+
+    @Override
+    protected DatumReader<T> createDatumReader(Configuration conf) {
+      return Avros.getReflectDataFactory(conf).getReader(getSchema());
+    }
+
+    @Override
+    protected DatumWriter<T> createDatumWriter(Configuration conf) {
+      return Avros.getReflectDataFactory(conf).getWriter(getSchema());
+    }
+  }
+
+  /**
+   * Create a deep copy of an Avro value.
+   * 
+   * @param source The value to be copied
+   * @return The deep copy of the value
+   */
+  @Override
+  public T deepCopy(T source) {
+    
+    if (source == null) {
+      return null;
+    }
+    
+    if (datumReader == null) {
+      datumReader = createDatumReader(conf);
+    }
+    if (datumWriter == null) {
+      datumWriter = createDatumWriter(conf);
+    }
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
+    T target = createCopyTarget();
+    try {
+      datumWriter.write(source, binaryEncoder);
+      binaryEncoder.flush();
+      binaryDecoder = DecoderFactory.get()
+          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      datumReader.read(target, binaryDecoder);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+    }
+
+    return target;
+  }
+
+  protected T createNewInstance(Class<T> targetClass) {
+    try {
+      return targetClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new CrunchRuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
new file mode 100644
index 0000000..598868f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.util.Collection;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroKeyComparator;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ *
+ *
+ */
+class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
+
+  private static final AvroPairConverter CONVERTER = new AvroPairConverter();
+  private final MapFn inputFn;
+  private final MapFn outputFn;
+
+  public AvroGroupedTableType(AvroTableType<K, V> tableType) {
+    super(tableType);
+    AvroType keyType = (AvroType) tableType.getKeyType();
+    AvroType valueType = (AvroType) tableType.getValueType();
+    this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
+    this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
+  }
+
+  @Override
+  public Class<Pair<K, Iterable<V>>> getTypeClass() {
+    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
+  }
+
+  @Override
+  public Converter getGroupingConverter() {
+    return CONVERTER;
+  }
+
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    getTableType().initialize(conf);
+  }
+
+  @Override
+  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
+    return PTables.getGroupedDetachedValue(this, value);
+  }
+
+  @Override
+  public void configureShuffle(Job job, GroupingOptions options) {
+    AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
+    String schemaJson = att.getSchema().toString();
+    Configuration conf = job.getConfiguration();
+
+    if (att.hasReflect()) {
+      if (att.hasSpecific()) {
+        Avros.checkCombiningSpecificAndReflectionSchemas();
+      }
+      conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
+    }
+    conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
+    job.setSortComparatorClass(AvroKeyComparator.class);
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setMapOutputValueClass(AvroValue.class);
+    if (options != null) {
+      options.configure(job);
+    }
+
+    Avros.configureReflectDataFactory(conf);
+
+    Collection<String> serializations = job.getConfiguration().getStringCollection(
+        "io.serializations");
+    if (!serializations.contains(SafeAvroSerialization.class.getName())) {
+      serializations.add(SafeAvroSerialization.class.getName());
+      job.getConfiguration().setStrings("io.serializations", serializations.toArray(new String[0]));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
new file mode 100644
index 0000000..b8bbebd
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */
+public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
+  @Override
+  public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    context.setStatus(split.toString());
+    String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
+    Schema schema = new Schema.Parser().parse(jsonSchema);
+    return new AvroRecordReader<T>(schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
new file mode 100644
index 0000000..68b717d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.crunch.types.Converter;
+import org.apache.hadoop.io.NullWritable;
+
+class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> {
+
+  private transient AvroWrapper<K> wrapper = null;
+
+  @Override
+  public K convertInput(AvroWrapper<K> key, NullWritable value) {
+    return key.datum();
+  }
+
+  @Override
+  public AvroWrapper<K> outputKey(K value) {
+    getWrapper().datum(value);
+    return wrapper;
+  }
+
+  @Override
+  public NullWritable outputValue(K value) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public Class<AvroWrapper<K>> getKeyClass() {
+    return (Class<AvroWrapper<K>>) getWrapper().getClass();
+  }
+
+  @Override
+  public Class<NullWritable> getValueClass() {
+    return NullWritable.class;
+  }
+
+  private AvroWrapper<K> getWrapper() {
+    if (wrapper == null) {
+      wrapper = new AvroWrapper<K>();
+    }
+    return wrapper;
+  }
+
+  @Override
+  public Iterable<K> convertIterableInput(AvroWrapper<K> key, Iterable<NullWritable> value) {
+    throw new UnsupportedOperationException("Should not be possible");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
new file mode 100644
index 0000000..98d3f50
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */
+public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+  @Override
+  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+
+    Configuration conf = context.getConfiguration();
+    Schema schema = null;
+    String outputName = conf.get("crunch.namedoutput");
+    if (outputName != null && !outputName.isEmpty()) {
+      schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
+    } else {
+      schema = AvroJob.getOutputSchema(context.getConfiguration());
+    }
+
+    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter(schema));
+
+    JobConf jc = new JobConf(conf);
+    /* copied from org.apache.avro.mapred.AvroOutputFormat */
+    
+    if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) {
+      int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
+          org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+      String codecName = conf.get(AvroJob.OUTPUT_CODEC, 
+          org.apache.avro.file.DataFileConstants.DEFLATE_CODEC);
+      CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC)
+          ? CodecFactory.deflateCodec(level)
+          : CodecFactory.fromString(codecName);
+      WRITER.setCodec(codec);
+    }
+
+    WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, 
+        org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL));
+
+    Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
+    WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path));
+    
+    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+      @Override
+      public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {
+        WRITER.append(wrapper.datum());
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        WRITER.close();
+      }
+    };
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
new file mode 100644
index 0000000..d1d2627
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.util.Iterator;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.Converter;
+
+class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<V>, Pair<K, V>, Pair<K, Iterable<V>>> {
+
+  private transient AvroKey<K> keyWrapper = null;
+  private transient AvroValue<V> valueWrapper = null;
+
+  @Override
+  public Pair<K, V> convertInput(AvroKey<K> key, AvroValue<V> value) {
+    return Pair.of(key.datum(), value.datum());
+  }
+
+  public Pair<K, Iterable<V>> convertIterableInput(AvroKey<K> key, Iterable<AvroValue<V>> iter) {
+    Iterable<V> it = new AvroWrappedIterable<V>(iter);
+    return Pair.of(key.datum(), it);
+  }
+
+  @Override
+  public AvroKey<K> outputKey(Pair<K, V> value) {
+    getKeyWrapper().datum(value.first());
+    return keyWrapper;
+  }
+
+  @Override
+  public AvroValue<V> outputValue(Pair<K, V> value) {
+    getValueWrapper().datum(value.second());
+    return valueWrapper;
+  }
+
+  @Override
+  public Class<AvroKey<K>> getKeyClass() {
+    return (Class<AvroKey<K>>) getKeyWrapper().getClass();
+  }
+
+  @Override
+  public Class<AvroValue<V>> getValueClass() {
+    return (Class<AvroValue<V>>) getValueWrapper().getClass();
+  }
+
+  private AvroKey<K> getKeyWrapper() {
+    if (keyWrapper == null) {
+      keyWrapper = new AvroKey<K>();
+    }
+    return keyWrapper;
+  }
+
+  private AvroValue<V> getValueWrapper() {
+    if (valueWrapper == null) {
+      valueWrapper = new AvroValue<V>();
+    }
+    return valueWrapper;
+  }
+
+  private static class AvroWrappedIterable<V> implements Iterable<V> {
+
+    private final Iterable<AvroValue<V>> iters;
+
+    public AvroWrappedIterable(Iterable<AvroValue<V>> iters) {
+      this.iters = iters;
+    }
+
+    @Override
+    public Iterator<V> iterator() {
+      return new Iterator<V>() {
+        private final Iterator<AvroValue<V>> it = iters.iterator();
+
+        @Override
+        public boolean hasNext() {
+          return it.hasNext();
+        }
+
+        @Override
+        public V next() {
+          return it.next().datum();
+        }
+
+        @Override
+        public void remove() {
+          it.remove();
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
new file mode 100644
index 0000000..9c7578c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An {@link RecordReader} for Avro data files. */
+class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
+
+  private FileReader<T> reader;
+  private long start;
+  private long end;
+  private AvroWrapper<T> key;
+  private NullWritable value;
+  private Schema schema;
+
+  public AvroRecordReader(Schema schema) {
+    this.schema = schema;
+  }
+
+  @Override
+  public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration conf = context.getConfiguration();
+    SeekableInput in = new FsInput(split.getPath(), conf);
+    DatumReader<T> datumReader = null;
+    if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
+      ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+      datumReader = factory.getReader(schema);
+    } else {
+      datumReader = new SpecificDatumReader<T>(schema);
+    }
+    this.reader = DataFileReader.openReader(in, datumReader);
+    reader.sync(split.getStart()); // sync to start
+    this.start = reader.tell();
+    this.end = split.getStart() + split.getLength();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!reader.hasNext() || reader.pastSync(end)) {
+      key = null;
+      value = null;
+      return false;
+    }
+    if (key == null) {
+      key = new AvroWrapper<T>();
+    }
+    if (value == null) {
+      value = NullWritable.get();
+    }
+    key.datum(reader.next(key.datum()));
+    return true;
+  }
+
+  @Override
+  public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
+    return key;
+  }
+
+  @Override
+  public NullWritable getCurrentValue() throws IOException, InterruptedException {
+    return value;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getPos() - start) / (float) (end - start));
+    }
+  }
+
+  public long getPos() throws IOException {
+    return reader.tell();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
new file mode 100644
index 0000000..86613df
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleDeepCopier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * The implementation of the PTableType interface for Avro-based serialization.
+ * 
+ */
+class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> {
+
+  private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> {
+    private final MapFn keyMapFn;
+    private final MapFn valueMapFn;
+    private final String firstJson;
+    private final String secondJson;
+
+    private String pairSchemaJson;
+    private transient Schema pairSchema;
+
+    public PairToAvroPair(AvroType keyType, AvroType valueType) {
+      this.keyMapFn = keyType.getOutputMapFn();
+      this.firstJson = keyType.getSchema().toString();
+      this.valueMapFn = valueType.getOutputMapFn();
+      this.secondJson = valueType.getSchema().toString();
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      keyMapFn.configure(conf);
+      valueMapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      keyMapFn.setContext(context);
+      valueMapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      keyMapFn.initialize();
+      valueMapFn.initialize();
+      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
+          new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
+    }
+
+    @Override
+    public org.apache.avro.mapred.Pair map(Pair input) {
+      if (pairSchema == null) {
+        pairSchema = new Schema.Parser().parse(pairSchemaJson);
+      }
+      org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema);
+      avroPair.key(keyMapFn.map(input.first()));
+      avroPair.value(valueMapFn.map(input.second()));
+      return avroPair;
+    }
+  }
+
+  private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
+
+    private final MapFn firstMapFn;
+    private final MapFn secondMapFn;
+
+    public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
+      this.firstMapFn = firstMapFn;
+      this.secondMapFn = secondMapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      firstMapFn.configure(conf);
+      secondMapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      firstMapFn.setContext(context);
+      secondMapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      firstMapFn.initialize();
+      secondMapFn.initialize();
+    }
+
+    @Override
+    public Pair map(IndexedRecord input) {
+      return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1)));
+    }
+  }
+
+  private final AvroType<K> keyType;
+  private final AvroType<V> valueType;
+
+  public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
+    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
+        valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
+        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(
+        Pair.class, keyType, valueType), keyType, valueType);
+    this.keyType = keyType;
+    this.valueType = valueType;
+  }
+
+  @Override
+  public PType<K> getKeyType() {
+    return keyType;
+  }
+
+  @Override
+  public PType<V> getValueType() {
+    return valueType;
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return new AvroGroupedTableType<K, V>(this);
+  }
+
+  @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
new file mode 100644
index 0000000..4930235
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class AvroTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
+  class DatumRecordTextWriter extends RecordWriter<K, V> {
+    private RecordWriter lineRecordWriter;
+
+    public DatumRecordTextWriter(RecordWriter recordWriter) {
+      this.lineRecordWriter = recordWriter;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      lineRecordWriter.close(context);
+    }
+
+    @Override
+    public void write(K arg0, V arg1) throws IOException, InterruptedException {
+      lineRecordWriter.write(getData(arg0), getData(arg1));
+    }
+
+    private Object getData(Object o) {
+      Object data = o;
+      if (o instanceof AvroWrapper) {
+        data = ((AvroWrapper) o).datum();
+      }
+      return data;
+    }
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+    RecordWriter<K, V> recordWriter = super.getRecordWriter(context);
+    return new DatumRecordTextWriter(recordWriter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
new file mode 100644
index 0000000..a92b0d0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.DeepCopier;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * The implementation of the PType interface for Avro-based serialization.
+ * 
+ */
+public class AvroType<T> implements PType<T> {
+
+  private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
+
+  private final Class<T> typeClass;
+  private final String schemaString;
+  private transient Schema schema;
+  private final MapFn baseInputMapFn;
+  private final MapFn baseOutputMapFn;
+  private final List<PType> subTypes;
+  private DeepCopier<T> deepCopier;
+  private boolean initialized = false;
+
+  public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) {
+    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes);
+  }
+
+  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+      DeepCopier<T> deepCopier, PType... ptypes) {
+    this.typeClass = typeClass;
+    this.schema = Preconditions.checkNotNull(schema);
+    this.schemaString = schema.toString();
+    this.baseInputMapFn = inputMapFn;
+    this.baseOutputMapFn = outputMapFn;
+    this.deepCopier = deepCopier;
+    this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
+  }
+
+  @Override
+  public Class<T> getTypeClass() {
+    return typeClass;
+  }
+
+  @Override
+  public PTypeFamily getFamily() {
+    return AvroTypeFamily.getInstance();
+  }
+
+  @Override
+  public List<PType> getSubTypes() {
+    return Lists.<PType> newArrayList(subTypes);
+  }
+
+  public Schema getSchema() {
+    if (schema == null) {
+      schema = new Schema.Parser().parse(schemaString);
+    }
+    return schema;
+  }
+
+  /**
+   * Determine if the wrapped type is a specific data avro type or wraps one.
+   * 
+   * @return true if the wrapped type is a specific data type or wraps one
+   */
+  public boolean hasSpecific() {
+    if (Avros.isPrimitive(this)) {
+      return false;
+    }
+
+    if (!this.subTypes.isEmpty()) {
+      for (PType<?> subType : this.subTypes) {
+        AvroType<?> atype = (AvroType<?>) subType;
+        if (atype.hasSpecific()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    return SpecificRecord.class.isAssignableFrom(typeClass);
+  }
+
+  /**
+   * Determine if the wrapped type is a generic data avro type.
+   * 
+   * @return true if the wrapped type is a generic type
+   */
+  public boolean isGeneric() {
+    return GenericData.Record.class.equals(typeClass);
+  }
+
+  /**
+   * Determine if the wrapped type is a reflection-based avro type or wraps one.
+   * 
+   * @return true if the wrapped type is a reflection-based type or wraps one.
+   */
+  public boolean hasReflect() {
+    if (Avros.isPrimitive(this)) {
+      return false;
+    }
+
+    if (!this.subTypes.isEmpty()) {
+      for (PType<?> subType : this.subTypes) {
+        if (((AvroType<?>) subType).hasReflect()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
+        .isAssignableFrom(typeClass));
+  }
+
+  public MapFn<Object, T> getInputMapFn() {
+    return baseInputMapFn;
+  }
+
+  public MapFn<T, Object> getOutputMapFn() {
+    return baseOutputMapFn;
+  }
+
+  @Override
+  public Converter getConverter() {
+    return AVRO_CONVERTER;
+  }
+
+  @Override
+  public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
+    return new AvroFileSourceTarget<T>(path, this);
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    deepCopier.initialize(conf);
+    initialized = true;
+  }
+
+  @Override
+  public T getDetachedValue(T value) {
+    if (!initialized) {
+      throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType");
+    }
+    return deepCopier.deepCopy(value);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof AvroType)) {
+      return false;
+    }
+    AvroType at = (AvroType) other;
+    return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
+
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    hcb.append(typeClass).append(subTypes);
+    return hcb.toHashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
new file mode 100644
index 0000000..e09e173
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypeUtils;
+
+public class AvroTypeFamily implements PTypeFamily {
+
+  private static final AvroTypeFamily INSTANCE = new AvroTypeFamily();
+
+  public static AvroTypeFamily getInstance() {
+    return INSTANCE;
+  }
+
+  // There can only be one instance.
+  private AvroTypeFamily() {
+  }
+
+  @Override
+  public PType<Void> nulls() {
+    return Avros.nulls();
+  }
+
+  @Override
+  public PType<String> strings() {
+    return Avros.strings();
+  }
+
+  @Override
+  public PType<Long> longs() {
+    return Avros.longs();
+  }
+
+  @Override
+  public PType<Integer> ints() {
+    return Avros.ints();
+  }
+
+  @Override
+  public PType<Float> floats() {
+    return Avros.floats();
+  }
+
+  @Override
+  public PType<Double> doubles() {
+    return Avros.doubles();
+  }
+
+  @Override
+  public PType<Boolean> booleans() {
+    return Avros.booleans();
+  }
+
+  @Override
+  public PType<ByteBuffer> bytes() {
+    return Avros.bytes();
+  }
+
+  @Override
+  public <T> PType<T> records(Class<T> clazz) {
+    return Avros.records(clazz);
+  }
+
+  public PType<GenericData.Record> generics(Schema schema) {
+    return Avros.generics(schema);
+  }
+
+  public <T> PType<T> containers(Class<T> clazz) {
+    return Avros.containers(clazz);
+  }
+
+  @Override
+  public <T> PType<Collection<T>> collections(PType<T> ptype) {
+    return Avros.collections(ptype);
+  }
+
+  @Override
+  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
+    return Avros.maps(ptype);
+  }
+
+  @Override
+  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
+    return Avros.pairs(p1, p2);
+  }
+
+  @Override
+  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
+    return Avros.triples(p1, p2, p3);
+  }
+
+  @Override
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+    return Avros.quads(p1, p2, p3, p4);
+  }
+
+  @Override
+  public PType<TupleN> tuples(PType<?>... ptypes) {
+    return Avros.tuples(ptypes);
+  }
+
+  @Override
+  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    return Avros.tableOf(key, value);
+  }
+
+  @Override
+  public <T> PType<T> as(PType<T> ptype) {
+    if (ptype instanceof AvroType || ptype instanceof AvroGroupedTableType) {
+      return ptype;
+    }
+    if (ptype instanceof PGroupedTableType) {
+      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
+      return new AvroGroupedTableType((AvroTableType) as(ptt));
+    }
+    Class<T> typeClass = ptype.getTypeClass();
+    PType<T> prim = Avros.getPrimitiveType(typeClass);
+    if (prim != null) {
+      return prim;
+    }
+    return PTypeUtils.convert(ptype, this);
+  }
+
+  @Override
+  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
+    return Avros.tuples(clazz, ptypes);
+  }
+
+  @Override
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
+    return Avros.derived(clazz, inputFn, outputFn, base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
new file mode 100644
index 0000000..9460fa5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for text files. Each line is
+ * a {@link Utf8} key; values are null.
+ */
+public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, NullWritable> {
+
+  static class Utf8LineRecordReader extends RecordReader<AvroWrapper<Utf8>, NullWritable> {
+
+    private LineRecordReader lineRecordReader;
+
+    private AvroWrapper<Utf8> currentKey = new AvroWrapper<Utf8>();
+
+    public Utf8LineRecordReader() throws IOException {
+      this.lineRecordReader = new LineRecordReader();
+    }
+
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    public float getProgress() throws IOException {
+      return lineRecordReader.getProgress();
+    }
+
+    @Override
+    public AvroWrapper<Utf8> getCurrentKey() throws IOException, InterruptedException {
+      Text txt = lineRecordReader.getCurrentValue();
+      currentKey.datum(new Utf8(txt.toString()));
+      return currentKey;
+    }
+
+    @Override
+    public NullWritable getCurrentValue() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+      lineRecordReader.initialize(split, context);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return lineRecordReader.nextKeyValue();
+    }
+  }
+
+  private CompressionCodecFactory compressionCodecs = null;
+
+  public void configure(Configuration conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    return compressionCodecs.getCodec(file) == null;
+  }
+
+  @Override
+  public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new Utf8LineRecordReader();
+  }
+}


Mime
View raw message