incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [1/2] git commit: Correct use of Avro reflection values in reducers
Date Thu, 12 Jul 2012 07:06:44 GMT
Updated Branches:
  refs/heads/master 204aeb1dd -> f699409f3


Correct use of Avro reflection values in reducers

Correct issues with invalid GenericData.Record objects containing
Avro reflection-based values in a reducer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f699409f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f699409f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f699409f

Branch: refs/heads/master
Commit: f699409f388ea20a551e007796e9addf631d1a29
Parents: 204aeb1
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Wed Jul 11 09:26:55 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Thu Jul 12 08:36:01 2012 +0200

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileReaderFactory.java      |   88 +-
 .../org/apache/crunch/io/avro/AvroFileSource.java  |   13 +-
 .../crunch/types/avro/AvroGroupedTableType.java    |   38 +-
 .../apache/crunch/types/avro/AvroTableType.java    |  239 ++--
 .../org/apache/crunch/types/avro/AvroType.java     |  218 ++--
 .../apache/crunch/types/avro/AvroTypeFamily.java   |   29 +-
 .../java/org/apache/crunch/types/avro/Avros.java   | 1126 +++++++--------
 .../apache/crunch/io/avro/AvroFileSourceTest.java  |   79 +-
 .../test/java/org/apache/crunch/lib/SortTest.java  |  261 ++--
 .../java/org/apache/crunch/test/StringWrapper.java |   85 ++
 .../crunch/types/avro/AvroTableTypeTest.java       |   22 +-
 .../org/apache/crunch/types/avro/AvroTypeTest.java |  155 ++-
 .../org/apache/crunch/types/avro/AvrosTest.java    |  125 ++-
 13 files changed, 1366 insertions(+), 1112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index c92c3ee..220b134 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -28,63 +28,61 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.avro.AvroType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.crunch.MapFn;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.types.avro.AvroType;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.UnmodifiableIterator;
 
 public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
 
-	private static final Log LOG = LogFactory
-			.getLog(AvroFileReaderFactory.class);
+  private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
 
-	private final DatumReader<T> recordReader;
-	private final MapFn<T, T> mapFn;
-	private final Configuration conf;
+  private final DatumReader<T> recordReader;
+  private final MapFn<T, T> mapFn;
+  private final Configuration conf;
 
-	public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
-		this.recordReader = createDatumReader(atype);
-		this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
-		this.conf = conf;
-	}
+  public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+    this.recordReader = createDatumReader(atype);
+    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+    this.conf = conf;
+  }
 
-	private DatumReader<T> createDatumReader(AvroType<T> avroType) {
-		if (avroType.isSpecific()) {
-			return new SpecificDatumReader<T>(avroType.getSchema());
-		} else if (avroType.isGeneric()) {
-			return new GenericDatumReader<T>(avroType.getSchema());
-		} else {
-			return new ReflectDatumReader<T>(avroType.getSchema());
-		}
-	}
+  private DatumReader<T> createDatumReader(AvroType<T> avroType) {
+    if (avroType.isSpecific()) {
+      return new SpecificDatumReader<T>(avroType.getSchema());
+    } else if (avroType.isGeneric()) {
+      return new GenericDatumReader<T>(avroType.getSchema());
+    } else {
+      return new ReflectDatumReader<T>(avroType.getSchema());
+    }
+  }
 
-	@Override
-	public Iterator<T> read(FileSystem fs, final Path path) {
-		this.mapFn.setConfigurationForTest(conf);
-		this.mapFn.initialize();
-		try {
-			FsInput fsi = new FsInput(path, fs.getConf());
-			final DataFileReader<T> reader = new DataFileReader<T>(fsi,
-					recordReader);
-			return new UnmodifiableIterator<T>() {
-				@Override
-				public boolean hasNext() {
-					return reader.hasNext();
-				}
+  @Override
+  public Iterator<T> read(FileSystem fs, final Path path) {
+    this.mapFn.setConfigurationForTest(conf);
+    this.mapFn.initialize();
+    try {
+      FsInput fsi = new FsInput(path, fs.getConf());
+      final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
+      return new UnmodifiableIterator<T>() {
+        @Override
+        public boolean hasNext() {
+          return reader.hasNext();
+        }
 
-				@Override
-				public T next() {
-					return mapFn.map(reader.next());
-				}
-			};
-		} catch (IOException e) {
-			LOG.info("Could not read avro file at path: " + path, e);
-			return Iterators.emptyIterator();
-		}
-	}
+        @Override
+        public T next() {
+          return mapFn.map(reader.next());
+        }
+      };
+    } catch (IOException e) {
+      LOG.info("Could not read avro file at path: " + path, e);
+      return Iterators.emptyIterator();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 3cbe924..b2689dd 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -20,10 +20,6 @@ package org.apache.crunch.io.avro;
 import java.io.IOException;
 
 import org.apache.avro.mapred.AvroJob;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
@@ -31,12 +27,15 @@ import org.apache.crunch.io.impl.InputBundle;
 import org.apache.crunch.types.avro.AvroInputFormat;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
 
   public AvroFileSource(Path path, AvroType<T> ptype) {
     super(path, ptype, new InputBundle(AvroInputFormat.class)
-        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(!ptype.isSpecific()))
+        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.isReflect()))
         .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
         .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
   }
@@ -49,7 +48,7 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
     FileSystem fs = FileSystem.get(path.toUri(), conf);
-    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
-        (AvroType<T>) ptype, conf));
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype,
+        conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 5ce970f..3b0c3ec 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -23,9 +23,6 @@ import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroKeyComparator;
 import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
@@ -33,6 +30,8 @@ import org.apache.crunch.fn.PairMapFn;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
  *
@@ -43,20 +42,18 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   private static final AvroPairConverter CONVERTER = new AvroPairConverter();
   private final MapFn inputFn;
   private final MapFn outputFn;
-  
+
   public AvroGroupedTableType(AvroTableType<K, V> tableType) {
     super(tableType);
     AvroType keyType = (AvroType) tableType.getKeyType();
     AvroType valueType = (AvroType) tableType.getValueType();
-    this.inputFn =  new PairIterableMapFn(keyType.getInputMapFn(),
-        valueType.getInputMapFn());
-    this.outputFn = new PairMapFn(keyType.getOutputMapFn(),
-        valueType.getOutputMapFn());
+    this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
+    this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
   }
 
   @Override
   public Class<Pair<K, Iterable<V>>> getTypeClass() {
-    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();  
+    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
   }
 
   @Override
@@ -68,12 +65,12 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   public MapFn getInputMapFn() {
     return inputFn;
   }
-  
+
   @Override
   public MapFn getOutputMapFn() {
     return outputFn;
   }
-  
+
   @Override
   public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
     return PTables.getGroupedDetachedValue(this, value);
@@ -84,9 +81,9 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
     AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
     String schemaJson = att.getSchema().toString();
     Configuration conf = job.getConfiguration();
-    
-    if (!att.isSpecific()) {
-        conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
+
+    if (att.isReflect()) {
+      conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
     }
     conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
     job.setSortComparatorClass(AvroKeyComparator.class);
@@ -95,16 +92,15 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
     if (options != null) {
       options.configure(job);
     }
-    
+
     Avros.configureReflectDataFactory(conf);
-    
-    Collection<String> serializations =
-        job.getConfiguration().getStringCollection("io.serializations");
+
+    Collection<String> serializations = job.getConfiguration().getStringCollection(
+        "io.serializations");
     if (!serializations.contains(SafeAvroSerialization.class.getName())) {
       serializations.add(SafeAvroSerialization.class.getName());
-      job.getConfiguration().setStrings("io.serializations",
-          serializations.toArray(new String[0]));
+      job.getConfiguration().setStrings("io.serializations", serializations.toArray(new String[0]));
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 6d21122..a7a2d0a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -19,146 +19,127 @@ package org.apache.crunch.types.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * The implementation of the PTableType interface for Avro-based serialization.
  * 
  */
-public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements
-		PTableType<K, V> {
-
-	private static class PairToAvroPair extends
-			MapFn<Pair, org.apache.avro.mapred.Pair> {
-		private final MapFn keyMapFn;
-		private final MapFn valueMapFn;
-		private final String firstJson;
-		private final String secondJson;
-
-		private String pairSchemaJson;
-		private transient Schema pairSchema;
-
-		public PairToAvroPair(AvroType keyType, AvroType valueType) {
-			this.keyMapFn = keyType.getOutputMapFn();
-			this.firstJson = keyType.getSchema().toString();
-			this.valueMapFn = valueType.getOutputMapFn();
-			this.secondJson = valueType.getSchema().toString();
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			keyMapFn.configure(conf);
-			valueMapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			keyMapFn.setConfigurationForTest(conf);
-			valueMapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			keyMapFn.setContext(getContext());
-			valueMapFn.setContext(getContext());
-			pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
-					new Schema.Parser().parse(firstJson),
-					new Schema.Parser().parse(secondJson)).toString();
-		}
-
-		@Override
-		public org.apache.avro.mapred.Pair map(Pair input) {
-			if (pairSchema == null) {
-				pairSchema = new Schema.Parser().parse(pairSchemaJson);
-			}
-			org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(
-					pairSchema);
-			avroPair.key(keyMapFn.map(input.first()));
-			avroPair.value(valueMapFn.map(input.second()));
-			return avroPair;
-		}
-	}
-
-	private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
-
-		private final MapFn firstMapFn;
-		private final MapFn secondMapFn;
-
-		public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
-			this.firstMapFn = firstMapFn;
-			this.secondMapFn = secondMapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			firstMapFn.configure(conf);
-			secondMapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			firstMapFn.setConfigurationForTest(conf);
-			secondMapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			firstMapFn.setContext(getContext());
-			secondMapFn.setContext(getContext());
-		}
-
-		@Override
-		public Pair map(IndexedRecord input) {
-			return Pair.of(firstMapFn.map(input.get(0)),
-					secondMapFn.map(input.get(1)));
-		}
-	}
-
-	private final AvroType<K> keyType;
-	private final AvroType<V> valueType;
-
-	public AvroTableType(AvroType<K> keyType, AvroType<V> valueType,
-			Class<Pair<K, V>> pairClass) {
-		super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(
-				keyType.getSchema(), valueType.getSchema()),
-				new IndexedRecordToPair(keyType.getInputMapFn(),
-						valueType.getInputMapFn()), new PairToAvroPair(keyType,
-						valueType), keyType, valueType);
-		this.keyType = keyType;
-		this.valueType = valueType;
-	}
-
-	@Override
-	public boolean isSpecific() {
-		return keyType.isSpecific() || valueType.isSpecific();
-	}
-
-	@Override
-	public boolean isGeneric() {
-		return keyType.isGeneric() || valueType.isGeneric();
-	}
-
-	@Override
-	public PType<K> getKeyType() {
-		return keyType;
-	}
-
-	@Override
-	public PType<V> getValueType() {
-		return valueType;
-	}
-
-	@Override
-	public PGroupedTableType<K, V> getGroupedTableType() {
-		return new AvroGroupedTableType<K, V>(this);
-	}
+public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableType<K, V> {
+
+  private static class PairToAvroPair extends MapFn<Pair, org.apache.avro.mapred.Pair> {
+    private final MapFn keyMapFn;
+    private final MapFn valueMapFn;
+    private final String firstJson;
+    private final String secondJson;
+
+    private String pairSchemaJson;
+    private transient Schema pairSchema;
+
+    public PairToAvroPair(AvroType keyType, AvroType valueType) {
+      this.keyMapFn = keyType.getOutputMapFn();
+      this.firstJson = keyType.getSchema().toString();
+      this.valueMapFn = valueType.getOutputMapFn();
+      this.secondJson = valueType.getSchema().toString();
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      keyMapFn.configure(conf);
+      valueMapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      keyMapFn.setConfigurationForTest(conf);
+      valueMapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      keyMapFn.setContext(getContext());
+      valueMapFn.setContext(getContext());
+      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
+          new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
+    }
+
+    @Override
+    public org.apache.avro.mapred.Pair map(Pair input) {
+      if (pairSchema == null) {
+        pairSchema = new Schema.Parser().parse(pairSchemaJson);
+      }
+      org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema);
+      avroPair.key(keyMapFn.map(input.first()));
+      avroPair.value(valueMapFn.map(input.second()));
+      return avroPair;
+    }
+  }
+
+  private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
+
+    private final MapFn firstMapFn;
+    private final MapFn secondMapFn;
+
+    public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
+      this.firstMapFn = firstMapFn;
+      this.secondMapFn = secondMapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      firstMapFn.configure(conf);
+      secondMapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      firstMapFn.setConfigurationForTest(conf);
+      secondMapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      firstMapFn.setContext(getContext());
+      secondMapFn.setContext(getContext());
+    }
+
+    @Override
+    public Pair map(IndexedRecord input) {
+      return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1)));
+    }
+  }
+
+  private final AvroType<K> keyType;
+  private final AvroType<V> valueType;
+
+  public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
+    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
+        valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
+        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), keyType, valueType);
+    this.keyType = keyType;
+    this.valueType = valueType;
+  }
+
+  @Override
+  public PType<K> getKeyType() {
+    return keyType;
+  }
+
+  @Override
+  public PType<V> getValueType() {
+    return valueType;
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return new AvroGroupedTableType<K, V>(this);
+  }
 
   @Override
   public Pair<K, V> getDetachedValue(Pair<K, V> value) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index d5d22a8..32a4334 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -23,8 +23,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.IdentityFn;
@@ -32,8 +30,11 @@ import org.apache.crunch.io.avro.AvroFileSourceTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 /**
  * The implementation of the PType interface for Avro-based serialization.
@@ -41,96 +42,111 @@ import com.google.common.collect.ImmutableList;
  */
 public class AvroType<T> implements PType<T> {
 
-	private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
+  private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
 
-	private final Class<T> typeClass;
+  private final Class<T> typeClass;
   private final String schemaString;
   private transient Schema schema;
-	private final MapFn baseInputMapFn;
-	private final MapFn baseOutputMapFn;
-	private final List<PType> subTypes;
+  private final MapFn baseInputMapFn;
+  private final MapFn baseOutputMapFn;
+  private final List<PType> subTypes;
   private AvroDeepCopier<T> deepCopier;
 
-	public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
-		this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
-				.getInstance(), ptypes);
-	}
-
-	public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn,
-			MapFn outputMapFn, PType... ptypes) {
-		this.typeClass = typeClass;
-		this.schema = Preconditions.checkNotNull(schema);
-		this.schemaString = schema.toString();
-		this.baseInputMapFn = inputMapFn;
-		this.baseOutputMapFn = outputMapFn;
-		this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return typeClass;
-	}
-
-	@Override
-	public PTypeFamily getFamily() {
-		return AvroTypeFamily.getInstance();
-	}
-
-	@Override
-	public List<PType> getSubTypes() {
-		return subTypes;
-	}
-
-	public Schema getSchema() {
-	  if (schema == null){
-	    schema = new Schema.Parser().parse(schemaString);
-	  }
-		return schema;
-	}
-
-	/**
-	 * Determine if the wrapped type is a specific data avro type.
-	 * 
-	 * @return true if the wrapped type is a specific data type
-	 */
-	public boolean isSpecific() {
-		if (SpecificRecord.class.isAssignableFrom(typeClass)) {
-			return true;
-		}
-		for (PType ptype : subTypes) {
-			if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	/**
-	 * Determine if the wrapped type is a generic data avro type.
-	 * 
-	 * @return true if the wrapped type is a generic type
-	 */
-	public boolean isGeneric() {
-		return GenericData.Record.class.equals(typeClass);
-	}
-
-	public MapFn<Object, T> getInputMapFn() {
-		return baseInputMapFn;
-	}
-
-	public MapFn<T, Object> getOutputMapFn() {
-		return baseOutputMapFn;
-	}
-
-	@Override
-	public Converter getConverter() {
-		return AVRO_CONVERTER;
-	}
-
-	@Override
-	public SourceTarget<T> getDefaultFileSource(Path path) {
-		return new AvroFileSourceTarget<T>(path, this);
-	}
+  public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
+    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes);
+  }
+
+  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
+      PType... ptypes) {
+    this.typeClass = typeClass;
+    this.schema = Preconditions.checkNotNull(schema);
+    this.schemaString = schema.toString();
+    this.baseInputMapFn = inputMapFn;
+    this.baseOutputMapFn = outputMapFn;
+    this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
+  }
+
+  @Override
+  public Class<T> getTypeClass() {
+    return typeClass;
+  }
+
+  @Override
+  public PTypeFamily getFamily() {
+    return AvroTypeFamily.getInstance();
+  }
+
+  @Override
+  public List<PType> getSubTypes() {
+    return Lists.<PType> newArrayList(subTypes);
+  }
+
+  public Schema getSchema() {
+    if (schema == null) {
+      schema = new Schema.Parser().parse(schemaString);
+    }
+    return schema;
+  }
+
+  /**
+   * Determine if the wrapped type is a specific data avro type.
+   * 
+   * @return true if the wrapped type is a specific data type
+   */
+  public boolean isSpecific() {
+    return SpecificRecord.class.isAssignableFrom(typeClass);
+  }
+
+  /**
+   * Determine if the wrapped type is a generic data avro type.
+   * 
+   * @return true if the wrapped type is a generic type
+   */
+  public boolean isGeneric() {
+    return GenericData.Record.class.equals(typeClass);
+  }
+
+  /**
+   * Determine if the wrapped type is a reflection-based avro type.
+   * 
+   * @return true if the wrapped type is a reflection-based type
+   */
+  public boolean isReflect() {
+    if (Avros.isPrimitive(this)) {
+      return false;
+    }
+
+    if (!this.subTypes.isEmpty()) {
+
+      for (PType<?> subType : this.subTypes) {
+        if (((AvroType<?>) subType).isReflect()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
+        .isAssignableFrom(typeClass));
+  }
+
+  public MapFn<Object, T> getInputMapFn() {
+    return baseInputMapFn;
+  }
+
+  public MapFn<T, Object> getOutputMapFn() {
+    return baseOutputMapFn;
+  }
+
+  @Override
+  public Converter getConverter() {
+    return AVRO_CONVERTER;
+  }
+
+  @Override
+  public SourceTarget<T> getDefaultFileSource(Path path) {
+    return new AvroFileSourceTarget<T>(path, this);
+  }
 
   private AvroDeepCopier<T> getDeepCopier() {
     if (deepCopier == null) {
@@ -152,21 +168,21 @@ public class AvroType<T> implements PType<T> {
     return value;
   }
 
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof AvroType)) {
-			return false;
-		}
-		AvroType at = (AvroType) other;
-		return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof AvroType)) {
+      return false;
+    }
+    AvroType at = (AvroType) other;
+    return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
 
-	}
+  }
 
-	@Override
-	public int hashCode() {
-		HashCodeBuilder hcb = new HashCodeBuilder();
-		hcb.append(typeClass).append(subTypes);
-		return hcb.toHashCode();
-	}
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    hcb.append(typeClass).append(subTypes);
+    return hcb.toHashCode();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
index b7d5598..f8645c3 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple;
@@ -37,18 +36,17 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.PTypeUtils;
 
 public class AvroTypeFamily implements PTypeFamily {
-  
+
   private static final AvroTypeFamily INSTANCE = new AvroTypeFamily();
-  
+
   public static AvroTypeFamily getInstance() {
     return INSTANCE;
   }
-  
+
   // There can only be one instance.
   private AvroTypeFamily() {
   }
 
-  
   @Override
   public PType<Void> nulls() {
     return Avros.nulls();
@@ -95,13 +93,13 @@ public class AvroTypeFamily implements PTypeFamily {
   }
 
   public PType<GenericData.Record> generics(Schema schema) {
-	return Avros.generics(schema);
+    return Avros.generics(schema);
   }
-  
+
   public <T> PType<T> containers(Class<T> clazz) {
     return Avros.containers(clazz);
   }
-  
+
   @Override
   public <T> PType<Collection<T>> collections(PType<T> ptype) {
     return Avros.collections(ptype);
@@ -109,23 +107,22 @@ public class AvroTypeFamily implements PTypeFamily {
 
   @Override
   public <T> PType<Map<String, T>> maps(PType<T> ptype) {
-	return Avros.maps(ptype);
+    return Avros.maps(ptype);
   }
-  
+
   @Override
   public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
     return Avros.pairs(p1, p2);
   }
 
   @Override
-  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
+  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
     return Avros.triples(p1, p2, p3);
   }
 
   @Override
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3, PType<V4> p4) {
     return Avros.quads(p1, p2, p3, p4);
   }
 
@@ -162,8 +159,8 @@ public class AvroTypeFamily implements PTypeFamily {
   }
 
   @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+      PType<S> base) {
     return Avros.derived(clazz, inputFn, outputFn, base);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index fbc6bf3..a6d7169 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -32,13 +32,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple;
@@ -51,6 +48,10 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.util.PTypes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -63,574 +64,561 @@ import com.google.common.collect.Maps;
  */
 public class Avros {
 
-	/**
-	 * The instance we use for generating reflected schemas. May be modified by
-	 * clients (e.g., Scrunch.)
-	 */
-	public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
-
-	/**
-	 * The name of the configuration parameter that tracks which reflection
-	 * factory to use.
-	 */
-	public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
-
-	public static void configureReflectDataFactory(Configuration conf) {
-		conf.setClass(REFLECT_DATA_FACTORY_CLASS,
-				REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
-	}
-
-	public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
-		return (ReflectDataFactory) ReflectionUtils.newInstance(conf.getClass(
-				REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
-	}
-
-	public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
-		@Override
-		public String map(CharSequence input) {
-			return input.toString();
-		}
-	};
-
-	public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
-		@Override
-		public Utf8 map(String input) {
-			return new Utf8(input);
-		}
-	};
-
-	public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
-		@Override
-		public ByteBuffer map(Object input) {
-			if (input instanceof ByteBuffer) {
-				return (ByteBuffer) input;
-			}
-			return ByteBuffer.wrap((byte[]) input);
-		}
-	};
-
-	private static final AvroType<String> strings = new AvroType<String>(
-			String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING,
-			STRING_TO_UTF8);
-	private static final AvroType<Void> nulls = create(Void.class,
-			Schema.Type.NULL);
-	private static final AvroType<Long> longs = create(Long.class,
-			Schema.Type.LONG);
-	private static final AvroType<Integer> ints = create(Integer.class,
-			Schema.Type.INT);
-	private static final AvroType<Float> floats = create(Float.class,
-			Schema.Type.FLOAT);
-	private static final AvroType<Double> doubles = create(Double.class,
-			Schema.Type.DOUBLE);
-	private static final AvroType<Boolean> booleans = create(Boolean.class,
-			Schema.Type.BOOLEAN);
-	private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
-			ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN,
-			IdentityFn.getInstance());
-
-	private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
-			.<Class<?>, PType<?>> builder().put(String.class, strings)
-			.put(Long.class, longs).put(Integer.class, ints)
-			.put(Float.class, floats).put(Double.class, doubles)
-			.put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
-
-	private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps
-			.newHashMap();
-
-	public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
-		EXTENSIONS.put(clazz, ptype);
-	}
-
-	public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
-		return (PType<T>) PRIMITIVES.get(clazz);
-	}
+  /**
+   * The instance we use for generating reflected schemas. May be modified by
+   * clients (e.g., Scrunch.)
+   */
+  public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
+
+  /**
+   * The name of the configuration parameter that tracks which reflection
+   * factory to use.
+   */
+  public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
+
+  public static void configureReflectDataFactory(Configuration conf) {
+    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
+        ReflectDataFactory.class);
+  }
+
+  public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
+    return (ReflectDataFactory) ReflectionUtils.newInstance(
+        conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
+  }
+
+  public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
+    @Override
+    public String map(CharSequence input) {
+      return input.toString();
+    }
+  };
+
+  public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
+    @Override
+    public Utf8 map(String input) {
+      return new Utf8(input);
+    }
+  };
+
+  public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
+    @Override
+    public ByteBuffer map(Object input) {
+      if (input instanceof ByteBuffer) {
+        return (ByteBuffer) input;
+      }
+      return ByteBuffer.wrap((byte[]) input);
+    }
+  };
+
+  private static final AvroType<String> strings = new AvroType<String>(String.class,
+      Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
+  private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
+  private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
+  private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
+  private static final AvroType<Float> floats = create(Float.class, Schema.Type.FLOAT);
+  private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
+  private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
+  private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
+      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
+
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
+      .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs)
+      .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles)
+      .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
+  private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
+
+  public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
+    EXTENSIONS.put(clazz, ptype);
+  }
+
+  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+    return (PType<T>) PRIMITIVES.get(clazz);
+  }
 
   static <T> boolean isPrimitive(AvroType<T> avroType) {
     return PRIMITIVES.containsKey(avroType.getTypeClass());
   }
 
-	private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
-		return new AvroType<T>(clazz, Schema.create(schemaType));
-	}
-
-	public static final AvroType<Void> nulls() {
-		return nulls;
-	}
-
-	public static final AvroType<String> strings() {
-		return strings;
-	}
-
-	public static final AvroType<Long> longs() {
-		return longs;
-	}
-
-	public static final AvroType<Integer> ints() {
-		return ints;
-	}
-
-	public static final AvroType<Float> floats() {
-		return floats;
-	}
-
-	public static final AvroType<Double> doubles() {
-		return doubles;
-	}
-
-	public static final AvroType<Boolean> booleans() {
-		return booleans;
-	}
-
-	public static final AvroType<ByteBuffer> bytes() {
-		return bytes;
-	}
-
-	public static final <T> AvroType<T> records(Class<T> clazz) {
-		if (EXTENSIONS.containsKey(clazz)) {
-			return (AvroType<T>) EXTENSIONS.get(clazz);
-		}
-		return containers(clazz);
-	}
-
-	public static final AvroType<GenericData.Record> generics(Schema schema) {
-		return new AvroType<GenericData.Record>(GenericData.Record.class,
-				schema);
-	}
-
-	public static final <T> AvroType<T> containers(Class<T> clazz) {
-		return reflects(clazz);
-	}
-
-	public static final <T> AvroType<T> reflects(Class<T> clazz) {
-		return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData()
-				.getSchema(clazz));
-	}
-
-	private static class BytesToWritableMapFn<T extends Writable> extends
-			MapFn<ByteBuffer, T> {
-		private static final Log LOG = LogFactory
-				.getLog(BytesToWritableMapFn.class);
-
-		private final Class<T> writableClazz;
-
-		public BytesToWritableMapFn(Class<T> writableClazz) {
-			this.writableClazz = writableClazz;
-		}
-
-		@Override
-		public T map(ByteBuffer input) {
-			T instance = ReflectionUtils.newInstance(writableClazz,
-					getConfiguration());
-			try {
-				instance.readFields(new DataInputStream(
-						new ByteArrayInputStream(input.array(), input
-								.arrayOffset(), input.limit())));
-			} catch (IOException e) {
-				LOG.error("Exception thrown reading instance of: "
-						+ writableClazz, e);
-			}
-			return instance;
-		}
-	}
-
-	private static class WritableToBytesMapFn<T extends Writable> extends
-			MapFn<T, ByteBuffer> {
-		private static final Log LOG = LogFactory
-				.getLog(WritableToBytesMapFn.class);
-
-		@Override
-		public ByteBuffer map(T input) {
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			DataOutputStream das = new DataOutputStream(baos);
-			try {
-				input.write(das);
-			} catch (IOException e) {
-				LOG.error("Exception thrown converting Writable to bytes", e);
-			}
-			return ByteBuffer.wrap(baos.toByteArray());
-		}
-	}
-
-	public static final <T extends Writable> AvroType<T> writables(
-			Class<T> clazz) {
-		return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES),
-				new BytesToWritableMapFn<T>(clazz),
-				new WritableToBytesMapFn<T>());
-	}
-
-	private static class GenericDataArrayToCollection<T> extends
-			MapFn<Object, Collection<T>> {
-
-		private final MapFn<Object, T> mapFn;
-
-		public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
-			this.mapFn = mapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public Collection<T> map(Object input) {
-			Collection<T> ret = Lists.newArrayList();
-			if (input instanceof Collection) {
-				for (Object in : (Collection<Object>) input) {
-					ret.add(mapFn.map(in));
-				}
-			} else {
-				// Assume it is an array
-				Object[] arr = (Object[]) input;
-				for (Object in : arr) {
-					ret.add(mapFn.map(in));
-				}
-			}
-			return ret;
-		}
-	}
-
-	private static class CollectionToGenericDataArray extends
-			MapFn<Collection<?>, GenericData.Array<?>> {
-
-		private final MapFn mapFn;
-		private final String jsonSchema;
-		private transient Schema schema;
-
-		public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
-			this.mapFn = mapFn;
-			this.jsonSchema = schema.toString();
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public GenericData.Array<?> map(Collection<?> input) {
-			if (schema == null) {
-				schema = new Schema.Parser().parse(jsonSchema);
-			}
-			GenericData.Array array = new GenericData.Array(input.size(),
-					schema);
-			for (Object in : input) {
-				array.add(mapFn.map(in));
-			}
-			return array;
-		}
-	}
-
-	public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
-		AvroType<T> avroType = (AvroType<T>) ptype;
-		Schema collectionSchema = Schema.createArray(allowNulls(avroType
-				.getSchema()));
-		GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
-				avroType.getInputMapFn());
-		CollectionToGenericDataArray output = new CollectionToGenericDataArray(
-				collectionSchema, avroType.getOutputMapFn());
-		return new AvroType(Collection.class, collectionSchema, input, output,
-				ptype);
-	}
-
-	private static class AvroMapToMap<T> extends
-			MapFn<Map<CharSequence, Object>, Map<String, T>> {
-		private final MapFn<Object, T> mapFn;
-
-		public AvroMapToMap(MapFn<Object, T> mapFn) {
-			this.mapFn = mapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public Map<String, T> map(Map<CharSequence, Object> input) {
-			Map<String, T> out = Maps.newHashMap();
-			for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
-				out.put(e.getKey().toString(), mapFn.map(e.getValue()));
-			}
-			return out;
-		}
-	}
-
-	private static class MapToAvroMap<T> extends
-			MapFn<Map<String, T>, Map<Utf8, Object>> {
-		private final MapFn<T, Object> mapFn;
-
-		public MapToAvroMap(MapFn<T, Object> mapFn) {
-			this.mapFn = mapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public Map<Utf8, Object> map(Map<String, T> input) {
-			Map<Utf8, Object> out = Maps.newHashMap();
-			for (Map.Entry<String, T> e : input.entrySet()) {
-				out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
-			}
-			return out;
-		}
-	}
-
-	public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
-		AvroType<T> avroType = (AvroType<T>) ptype;
-		Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
-		AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
-		MapToAvroMap<T> outputFn = new MapToAvroMap<T>(
-				avroType.getOutputMapFn());
-		return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
-	}
-
-	private static class GenericRecordToTuple extends
-			MapFn<GenericRecord, Tuple> {
-		private final TupleFactory<?> tupleFactory;
-		private final List<MapFn> fns;
-
-		private transient Object[] values;
-
-		public GenericRecordToTuple(TupleFactory<?> tupleFactory,
-				PType<?>... ptypes) {
-			this.tupleFactory = tupleFactory;
-			this.fns = Lists.newArrayList();
-			for (PType<?> ptype : ptypes) {
-				AvroType atype = (AvroType) ptype;
-				fns.add(atype.getInputMapFn());
-			}
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.configure(conf);
-			}
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.setConfigurationForTest(conf);
-			}
-		}
-
-		@Override
-		public void initialize() {
-			for (MapFn fn : fns) {
-				fn.setContext(getContext());
-			}
-			this.values = new Object[fns.size()];
-			tupleFactory.initialize();
-		}
-
-		@Override
-		public Tuple map(GenericRecord input) {
-			for (int i = 0; i < values.length; i++) {
-				Object v = input.get(i);
-				if (v == null) {
-					values[i] = null;
-				} else {
-					values[i] = fns.get(i).map(v);
-				}
-			}
-			return tupleFactory.makeTuple(values);
-		}
-	}
-
-	private static class TupleToGenericRecord extends
-			MapFn<Tuple, GenericRecord> {
-		private final List<MapFn> fns;
-		private final String jsonSchema;
-
-		private transient GenericRecord record;
-
-		public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
-			this.fns = Lists.newArrayList();
-			this.jsonSchema = schema.toString();
-			for (PType ptype : ptypes) {
-				AvroType atype = (AvroType) ptype;
-				fns.add(atype.getOutputMapFn());
-			}
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.configure(conf);
-			}
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.setConfigurationForTest(conf);
-			}
-		}
-
-		@Override
-		public void initialize() {
-			this.record = new GenericData.Record(
-					new Schema.Parser().parse(jsonSchema));
-			for (MapFn fn : fns) {
-				fn.setContext(getContext());
-			}
-		}
-
-		@Override
-		public GenericRecord map(Tuple input) {
-			for (int i = 0; i < input.size(); i++) {
-				Object v = input.get(i);
-				if (v == null) {
-					record.put(i, null);
-				} else {
-					record.put(i, fns.get(i).map(v));
-				}
-			}
-			return record;
-		}
-	}
-
-	public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1,
-			PType<V2> p2) {
-		Schema schema = createTupleSchema(p1, p2);
-		GenericRecordToTuple input = new GenericRecordToTuple(
-				TupleFactory.PAIR, p1, p2);
-		TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-		return new AvroType(Pair.class, schema, input, output, p1, p2);
-	}
-
-	public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(
-			PType<V1> p1, PType<V2> p2, PType<V3> p3) {
-		Schema schema = createTupleSchema(p1, p2, p3);
-		return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(
-				TupleFactory.TUPLE3, p1, p2, p3), new TupleToGenericRecord(
-				schema, p1, p2, p3), p1, p2, p3);
-	}
-
-	public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(
-			PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-		Schema schema = createTupleSchema(p1, p2, p3, p4);
-		return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(
-				TupleFactory.TUPLE4, p1, p2, p3, p4), new TupleToGenericRecord(
-				schema, p1, p2, p3, p4), p1, p2, p3, p4);
-	}
-
-	public static final AvroType<TupleN> tuples(PType... ptypes) {
-		Schema schema = createTupleSchema(ptypes);
-		return new AvroType(TupleN.class, schema, new GenericRecordToTuple(
-				TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
-				ptypes), ptypes);
-	}
-
-	public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz,
-			PType... ptypes) {
-		Schema schema = createTupleSchema(ptypes);
-		Class[] typeArgs = new Class[ptypes.length];
-		for (int i = 0; i < typeArgs.length; i++) {
-			typeArgs[i] = ptypes[i].getTypeClass();
-		}
-		TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-		return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory,
-				ptypes), new TupleToGenericRecord(schema, ptypes), ptypes);
-	}
-
-	private static Schema createTupleSchema(PType<?>... ptypes) {
-		// Guarantee each tuple schema has a globally unique name
-		String tupleName = "tuple"
-				+ UUID.randomUUID().toString().replace('-', 'x');
-		Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-		List<Schema.Field> fields = Lists.newArrayList();
-		for (int i = 0; i < ptypes.length; i++) {
-			AvroType atype = (AvroType) ptypes[i];
-			Schema fieldSchema = allowNulls(atype.getSchema());
-			fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
-		}
-		schema.setFields(fields);
-		return schema;
-	}
-
-	public static final <S, T> AvroType<T> derived(Class<T> clazz,
-			MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
-		AvroType<S> abase = (AvroType<S>) base;
-		return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(
-				abase.getInputMapFn(), inputFn), new CompositeMapFn(outputFn,
-				abase.getOutputMapFn()), base.getSubTypes().toArray(
-				new PType[0]));
-	}
-
-	public static <T> PType<T> jsons(Class<T> clazz) {
-		return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
-	}
-
-	public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key,
-			PType<V> value) {
-	  if (key instanceof PTableType) {
-	    PTableType ptt = (PTableType) key;
-	    key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
-	  }
-	  if (value instanceof PTableType) {
-	    PTableType ptt = (PTableType) value;
-	    value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
-	  }
-		AvroType<K> avroKey = (AvroType<K>) key;
-		AvroType<V> avroValue = (AvroType<V>) value;
-		return new AvroTableType(avroKey, avroValue, Pair.class);
-	}
-
-	private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
-
-	private static Schema allowNulls(Schema base) {
-		if (NULL_SCHEMA.equals(base)) {
-			return base;
-		}
-		return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
-	}
-
-	private Avros() {
-	}
+  private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
+    return new AvroType<T>(clazz, Schema.create(schemaType));
+  }
+
+  public static final AvroType<Void> nulls() {
+    return nulls;
+  }
+
+  public static final AvroType<String> strings() {
+    return strings;
+  }
+
+  public static final AvroType<Long> longs() {
+    return longs;
+  }
+
+  public static final AvroType<Integer> ints() {
+    return ints;
+  }
+
+  public static final AvroType<Float> floats() {
+    return floats;
+  }
+
+  public static final AvroType<Double> doubles() {
+    return doubles;
+  }
+
+  public static final AvroType<Boolean> booleans() {
+    return booleans;
+  }
+
+  public static final AvroType<ByteBuffer> bytes() {
+    return bytes;
+  }
+
+  public static final <T> AvroType<T> records(Class<T> clazz) {
+    if (EXTENSIONS.containsKey(clazz)) {
+      return (AvroType<T>) EXTENSIONS.get(clazz);
+    }
+    return containers(clazz);
+  }
+
+  public static final AvroType<GenericData.Record> generics(Schema schema) {
+    return new AvroType<GenericData.Record>(GenericData.Record.class, schema);
+  }
+
+  public static final <T> AvroType<T> containers(Class<T> clazz) {
+    return reflects(clazz);
+  }
+
+  public static final <T> AvroType<T> reflects(Class<T> clazz) {
+    return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz));
+  }
+
+  private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> {
+    private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class);
+
+    private final Class<T> writableClazz;
+
+    public BytesToWritableMapFn(Class<T> writableClazz) {
+      this.writableClazz = writableClazz;
+    }
+
+    @Override
+    public T map(ByteBuffer input) {
+      T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
+      try {
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input
+            .arrayOffset(), input.limit())));
+      } catch (IOException e) {
+        LOG.error("Exception thrown reading instance of: " + writableClazz, e);
+      }
+      return instance;
+    }
+  }
+
+  private static class WritableToBytesMapFn<T extends Writable> extends MapFn<T, ByteBuffer> {
+    private static final Log LOG = LogFactory.getLog(WritableToBytesMapFn.class);
+
+    @Override
+    public ByteBuffer map(T input) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream das = new DataOutputStream(baos);
+      try {
+        input.write(das);
+      } catch (IOException e) {
+        LOG.error("Exception thrown converting Writable to bytes", e);
+      }
+      return ByteBuffer.wrap(baos.toByteArray());
+    }
+  }
+
+  public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
+    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(
+        clazz), new WritableToBytesMapFn<T>());
+  }
+
+  private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
+
+    private final MapFn<Object, T> mapFn;
+
+    public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      this.mapFn.setContext(getContext());
+    }
+
+    @Override
+    public Collection<T> map(Object input) {
+      Collection<T> ret = Lists.newArrayList();
+      if (input instanceof Collection) {
+        for (Object in : (Collection<Object>) input) {
+          ret.add(mapFn.map(in));
+        }
+      } else {
+        // Assume it is an array
+        Object[] arr = (Object[]) input;
+        for (Object in : arr) {
+          ret.add(mapFn.map(in));
+        }
+      }
+      return ret;
+    }
+  }
+
+  private static class CollectionToGenericDataArray extends
+      MapFn<Collection<?>, GenericData.Array<?>> {
+
+    private final MapFn mapFn;
+    private final String jsonSchema;
+    private transient Schema schema;
+
+    public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
+      this.mapFn = mapFn;
+      this.jsonSchema = schema.toString();
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      this.mapFn.setContext(getContext());
+    }
+
+    @Override
+    public GenericData.Array<?> map(Collection<?> input) {
+      if (schema == null) {
+        schema = new Schema.Parser().parse(jsonSchema);
+      }
+      GenericData.Array array = new GenericData.Array(input.size(), schema);
+      for (Object in : input) {
+        array.add(mapFn.map(in));
+      }
+      return array;
+    }
+  }
+
+  public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
+    AvroType<T> avroType = (AvroType<T>) ptype;
+    Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
+    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
+        avroType.getInputMapFn());
+    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
+        avroType.getOutputMapFn());
+    return new AvroType(Collection.class, collectionSchema, input, output, ptype);
+  }
+
+  private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> {
+    private final MapFn<Object, T> mapFn;
+
+    public AvroMapToMap(MapFn<Object, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      this.mapFn.setContext(getContext());
+    }
+
+    @Override
+    public Map<String, T> map(Map<CharSequence, Object> input) {
+      Map<String, T> out = Maps.newHashMap();
+      for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+      }
+      return out;
+    }
+  }
+
+  private static class MapToAvroMap<T> extends MapFn<Map<String, T>, Map<Utf8, Object>> {
+    private final MapFn<T, Object> mapFn;
+
+    public MapToAvroMap(MapFn<T, Object> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      this.mapFn.setContext(getContext());
+    }
+
+    @Override
+    public Map<Utf8, Object> map(Map<String, T> input) {
+      Map<Utf8, Object> out = Maps.newHashMap();
+      for (Map.Entry<String, T> e : input.entrySet()) {
+        out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
+      }
+      return out;
+    }
+  }
+
+  public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
+    AvroType<T> avroType = (AvroType<T>) ptype;
+    Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
+    AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
+    MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
+    return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
+  }
+
+  private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
+    private final TupleFactory<?> tupleFactory;
+    private final List<MapFn> fns;
+
+    private transient Object[] values;
+
+    public GenericRecordToTuple(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
+      this.tupleFactory = tupleFactory;
+      this.fns = Lists.newArrayList();
+      for (PType<?> ptype : ptypes) {
+        AvroType atype = (AvroType) ptype;
+        fns.add(atype.getInputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.setConfigurationForTest(conf);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+      this.values = new Object[fns.size()];
+      tupleFactory.initialize();
+    }
+
+    @Override
+    public Tuple map(GenericRecord input) {
+      for (int i = 0; i < values.length; i++) {
+        Object v = input.get(i);
+        if (v == null) {
+          values[i] = null;
+        } else {
+          values[i] = fns.get(i).map(v);
+        }
+      }
+      return tupleFactory.makeTuple(values);
+    }
+  }
+
+  private static class TupleToGenericRecord extends MapFn<Tuple, GenericRecord> {
+    private final List<MapFn> fns;
+    private final List<AvroType> avroTypes;
+    private final String jsonSchema;
+    private final boolean isReflect;
+
+    private transient GenericRecord record;
+
+    public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      this.avroTypes = Lists.newArrayList();
+      this.jsonSchema = schema.toString();
+      boolean reflectFound = false;
+      for (PType ptype : ptypes) {
+        AvroType atype = (AvroType) ptype;
+        fns.add(atype.getOutputMapFn());
+        avroTypes.add(atype);
+        if (atype.isReflect()) {
+          reflectFound = true;
+        }
+      }
+      this.isReflect = reflectFound;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.setConfigurationForTest(conf);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      Schema schema = new Schema.Parser().parse(jsonSchema);
+      if (isReflect) {
+        this.record = new ReflectGenericRecord(schema);
+      } else {
+        this.record = new GenericData.Record(schema);
+      }
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+    }
+
+    @Override
+    public GenericRecord map(Tuple input) {
+      for (int i = 0; i < input.size(); i++) {
+        Object v = input.get(i);
+        if (v == null) {
+          record.put(i, null);
+        } else {
+          record.put(i, fns.get(i).map(v));
+        }
+      }
+      return record;
+    }
+  }
+
+  public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
+    Schema schema = createTupleSchema(p1, p2);
+    GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
+    TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
+    return new AvroType(Pair.class, schema, input, output, p1, p2);
+  }
+
+  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3) {
+    Schema schema = createTupleSchema(p1, p2, p3);
+    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2,
+        p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
+  }
+
+  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+    Schema schema = createTupleSchema(p1, p2, p3, p4);
+    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2,
+        p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
+  }
+
+  public static final AvroType<TupleN> tuples(PType... ptypes) {
+    Schema schema = createTupleSchema(ptypes);
+    return new AvroType(TupleN.class, schema,
+        new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
+            ptypes), ptypes);
+  }
+
+  public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
+    Schema schema = createTupleSchema(ptypes);
+    Class[] typeArgs = new Class[ptypes.length];
+    for (int i = 0; i < typeArgs.length; i++) {
+      typeArgs[i] = ptypes[i].getTypeClass();
+    }
+    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
+        new TupleToGenericRecord(schema, ptypes), ptypes);
+  }
+
+  private static Schema createTupleSchema(PType<?>... ptypes) {
+    // Guarantee each tuple schema has a globally unique name
+    String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+    Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (int i = 0; i < ptypes.length; i++) {
+      AvroType atype = (AvroType) ptypes[i];
+      Schema fieldSchema = allowNulls(atype.getSchema());
+      fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
+    }
+    schema.setFields(fields);
+    return schema;
+  }
+
+  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
+      MapFn<T, S> outputFn, PType<S> base) {
+    AvroType<S> abase = (AvroType<S>) base;
+    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
+        inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(
+        new PType[0]));
+  }
+
+  public static <T> PType<T> jsons(Class<T> clazz) {
+    return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
+  }
+
+  public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    if (key instanceof PTableType) {
+      PTableType ptt = (PTableType) key;
+      key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
+    }
+    if (value instanceof PTableType) {
+      PTableType ptt = (PTableType) value;
+      value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
+    }
+    AvroType<K> avroKey = (AvroType<K>) key;
+    AvroType<V> avroValue = (AvroType<V>) value;
+    return new AvroTableType(avroKey, avroValue, Pair.class);
+  }
+
+  private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
+
+  private static Schema allowNulls(Schema base) {
+    if (NULL_SCHEMA.equals(base)) {
+      return base;
+    }
+    return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
+  }
+
+  private static class ReflectGenericRecord extends GenericData.Record {
+
+    public ReflectGenericRecord(Schema schema) {
+      super(schema);
+    }
+
+    @Override
+    public int hashCode() {
+      return ReflectData.get().hashCode(this, getSchema());
+    }
+
+  }
+
+  private Avros() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
index 1f04c67..9529d45 100644
--- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -26,57 +26,66 @@ import java.io.IOException;
 
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.crunch.test.Person;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-
 public class AvroFileSourceTest {
 
-	private Job job;
-	File tempFile;
+  private Job job;
+  File tempFile;
+
+  @Before
+  public void setUp() throws IOException {
+    job = new Job();
+    tempFile = File.createTempFile("test", ".avr");
+  }
+
+  @After
+  public void tearDown() {
+    tempFile.delete();
+  }
+
+  @Test
+  public void testConfigureJob_SpecificData() throws IOException {
+    AvroType<Person> avroSpecificType = Avros.records(Person.class);
+    AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(
+        tempFile.getAbsolutePath()), avroSpecificType);
+
+    personFileSource.configureSource(job, -1);
 
-	@Before
-	public void setUp() throws IOException {
-		job = new Job();
-		tempFile = File.createTempFile("test", ".avr");
-	}
+    assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
+    assertEquals(Person.SCHEMA$.toString(), job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
+  }
 
-	@After
-	public void tearDown() {
-		tempFile.delete();
-	}
+  @Test
+  public void testConfigureJob_GenericData() throws IOException {
+    AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
+    AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(
+        tempFile.getAbsolutePath()), avroGenericType);
 
-	@Test
-	public void testConfigureJob_SpecificData() throws IOException {
-		AvroType<Person> avroSpecificType = Avros.records(Person.class);
-		AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(
-				new Path(tempFile.getAbsolutePath()), avroSpecificType);
+    personFileSource.configureSource(job, -1);
 
-		personFileSource.configureSource(job, -1);
+    assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
 
-		assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT,
-				true));
-		assertEquals(Person.SCHEMA$.toString(),
-				job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
-	}
+  }
 
-	@Test
-	public void testConfigureJob_GenericData() throws IOException {
-		AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
-		AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(
-				new Path(tempFile.getAbsolutePath()), avroGenericType);
+  @Test
+  public void testConfigureJob_ReflectData() throws IOException {
+    AvroType<StringWrapper> avroReflectType = Avros.reflects(StringWrapper.class);
+    AvroFileSource<StringWrapper> personFileSource = new AvroFileSource<StringWrapper>(new Path(
+        tempFile.getAbsolutePath()), avroReflectType);
 
-		personFileSource.configureSource(job, -1);
+    personFileSource.configureSource(job, -1);
 
-		assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT,
-				false));
+    assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, false));
 
-	}
+  }
 
 }


Mime
View raw message