incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [24/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
deleted file mode 100644
index be9f5e9..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-
-/**
- * Performs deep copies of Avro-serializable objects.
- * <p>
- * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be
- * a problem when running in a map-reduce context where each mapper/reducer is
- * running in its own JVM, but it may well be a problem in any other kind of
- * multi-threaded context.
- */
-public abstract class AvroDeepCopier<T> implements Serializable {
-
-  private BinaryEncoder binaryEncoder;
-  private BinaryDecoder binaryDecoder;
-  protected DatumWriter<T> datumWriter;
-  protected DatumReader<T> datumReader;
-
-  protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) {
-    this.datumWriter = datumWriter;
-    this.datumReader = datumReader;
-  }
-
-  protected abstract T createCopyTarget();
-
-  /**
-   * Deep copier for Avro specific data objects.
-   */
-  public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
-
-    private Class<T> valueClass;
-
-    public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
-      super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema));
-      this.valueClass = valueClass;
-    }
-
-    @Override
-    protected T createCopyTarget() {
-      return createNewInstance(valueClass);
-    }
-
-  }
-
-  /**
-   * Deep copier for Avro generic data objects.
-   */
-  public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
-
-    private Schema schema;
-
-    public AvroGenericDeepCopier(Schema schema) {
-      super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema));
-      this.schema = schema;
-    }
-
-    @Override
-    protected Record createCopyTarget() {
-      return new GenericData.Record(schema);
-    }
-  }
-
-  /**
-   * Deep copier for Avro reflect data objects.
-   */
-  public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
-    private Class<T> valueClass;
-
-    public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
-      super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema));
-      this.valueClass = valueClass;
-    }
-
-    @Override
-    protected T createCopyTarget() {
-      return createNewInstance(valueClass);
-    }
-  }
-
-  /**
-   * Create a deep copy of an Avro value.
-   * 
-   * @param source
-   *          The value to be copied
-   * @return The deep copy of the value
-   */
-  public T deepCopy(T source) {
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
-    T target = createCopyTarget();
-    try {
-      datumWriter.write(source, binaryEncoder);
-      binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get()
-          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
-      datumReader.read(target, binaryDecoder);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
-    }
-
-    return target;
-  }
-
-  protected T createNewInstance(Class<T> targetClass) {
-    try {
-      return targetClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new CrunchRuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
deleted file mode 100644
index 5ce970f..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.Collection;
-
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroKeyComparator;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.fn.PairMapFn;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-
-/**
- *
- *
- */
-public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
-
-  private static final AvroPairConverter CONVERTER = new AvroPairConverter();
-  private final MapFn inputFn;
-  private final MapFn outputFn;
-  
-  public AvroGroupedTableType(AvroTableType<K, V> tableType) {
-    super(tableType);
-    AvroType keyType = (AvroType) tableType.getKeyType();
-    AvroType valueType = (AvroType) tableType.getValueType();
-    this.inputFn =  new PairIterableMapFn(keyType.getInputMapFn(),
-        valueType.getInputMapFn());
-    this.outputFn = new PairMapFn(keyType.getOutputMapFn(),
-        valueType.getOutputMapFn());
-  }
-
-  @Override
-  public Class<Pair<K, Iterable<V>>> getTypeClass() {
-    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();  
-  }
-
-  @Override
-  public Converter getGroupingConverter() {
-    return CONVERTER;
-  }
-
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-  
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-  
-  @Override
-  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
-    return PTables.getGroupedDetachedValue(this, value);
-  }
-
-  @Override
-  public void configureShuffle(Job job, GroupingOptions options) {
-    AvroTableType<K, V> att = (AvroTableType<K, V>) tableType;
-    String schemaJson = att.getSchema().toString();
-    Configuration conf = job.getConfiguration();
-    
-    if (!att.isSpecific()) {
-        conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
-    }
-    conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson);
-    job.setSortComparatorClass(AvroKeyComparator.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    if (options != null) {
-      options.configure(job);
-    }
-    
-    Avros.configureReflectDataFactory(conf);
-    
-    Collection<String> serializations =
-        job.getConfiguration().getStringCollection("io.serializations");
-    if (!serializations.contains(SafeAvroSerialization.class.getName())) {
-      serializations.add(SafeAvroSerialization.class.getName());
-      job.getConfiguration().setStrings("io.serializations",
-          serializations.toArray(new String[0]));
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
deleted file mode 100644
index da5bbb2..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */
-public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
-
-	@Override
-	protected List<FileStatus> listStatus(JobContext job) throws IOException {
-	  List<FileStatus> result = new ArrayList<FileStatus>();
-      for (FileStatus file : super.listStatus(job)) {
-        if (file.getPath().getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-          result.add(file);
-		}
-      }
-      return result;
-	}
-
-	@Override
-	public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split,
-		TaskAttemptContext context) throws IOException, InterruptedException {
-      context.setStatus(split.toString());
-      String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
-      Schema schema = new Schema.Parser().parse(jsonSchema);
-      return new AvroRecordReader<T>(schema);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
deleted file mode 100644
index 60aa69b..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.io.NullWritable;
-
-import org.apache.crunch.types.Converter;
-
-public class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> {
-  
-  private transient AvroWrapper<K> wrapper = null;
-  
-  @Override
-  public K convertInput(AvroWrapper<K> key, NullWritable value) {
-    return key.datum();
-  }
-
-  @Override
-  public AvroWrapper<K> outputKey(K value) {
-    getWrapper().datum(value);
-    return wrapper;
-  }
-
-  @Override
-  public NullWritable outputValue(K value) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public Class<AvroWrapper<K>> getKeyClass() {
-    return (Class<AvroWrapper<K>>) getWrapper().getClass();
-  }
-
-  @Override
-  public Class<NullWritable> getValueClass() {
-    return NullWritable.class;
-  }
-
-  private AvroWrapper<K> getWrapper() {
-    if (wrapper == null) {
-      wrapper = new AvroWrapper<K>();
-    }
-    return wrapper;
-  }
-
-  @Override
-  public Iterable<K> convertIterableInput(AvroWrapper<K> key,
-      Iterable<NullWritable> value) {
-    throw new UnsupportedOperationException("Should not be possible");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
deleted file mode 100644
index 838c21a..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */
-public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
-
-  @Override
-  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
-      TaskAttemptContext context) throws IOException, InterruptedException {
-
-    Configuration conf = context.getConfiguration();
-    Schema schema = null;
-    String outputName = conf.get("crunch.namedoutput");
-    if (outputName != null && !outputName.isEmpty()) {
-      schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName));
-    } else {
-      schema = AvroJob.getOutputSchema(context.getConfiguration());
-    }
-    
-    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T>getWriter());
-
-    Path path = getDefaultWorkFile(context,
-        org.apache.avro.mapred.AvroOutputFormat.EXT);
-    WRITER.create(schema,
-        path.getFileSystem(context.getConfiguration()).create(path));
-
-    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
-      @Override
-      public void write(AvroWrapper<T> wrapper, NullWritable ignore)
-        throws IOException {
-        WRITER.append(wrapper.datum());
-      }
-      @Override
-      public void close(TaskAttemptContext context) throws IOException,
-          InterruptedException {
-        WRITER.close();
-      }
-    };
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
deleted file mode 100644
index 6ec3972..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.Iterator;
-
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.Converter;
-
-public class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<V>, Pair<K, V>, Pair<K, Iterable<V>>> {
-  
-  private transient AvroKey<K> keyWrapper = null;
-  private transient AvroValue<V> valueWrapper = null;
-  
-  @Override
-  public Pair<K, V> convertInput(AvroKey<K> key, AvroValue<V> value) {
-    return Pair.of(key.datum(), value.datum());
-  }
-
-  public Pair<K, Iterable<V>> convertIterableInput(AvroKey<K> key, Iterable<AvroValue<V>> iter) {
-    Iterable<V> it = new AvroWrappedIterable<V>(iter);
-    return Pair.of(key.datum(), it);  
-  }
-  
-  @Override
-  public AvroKey<K> outputKey(Pair<K, V> value) {
-    getKeyWrapper().datum(value.first());
-    return keyWrapper;
-  }
-
-  @Override
-  public AvroValue<V> outputValue(Pair<K, V> value) {
-    getValueWrapper().datum(value.second());
-    return valueWrapper;
-  }
-
-  @Override
-  public Class<AvroKey<K>> getKeyClass() {
-    return (Class<AvroKey<K>>) getKeyWrapper().getClass();
-  }
-
-  @Override
-  public Class<AvroValue<V>> getValueClass() {
-    return (Class<AvroValue<V>>) getValueWrapper().getClass();
-  }
-  
-  private AvroKey<K> getKeyWrapper() {
-    if (keyWrapper == null) {
-      keyWrapper = new AvroKey<K>();
-    }
-    return keyWrapper;
-  }
-  
-  private AvroValue<V> getValueWrapper() {
-    if (valueWrapper == null) {
-      valueWrapper = new AvroValue<V>();
-    }
-    return valueWrapper;
-  }
-  
-  private static class AvroWrappedIterable<V> implements Iterable<V> {
-
-    private final Iterable<AvroValue<V>> iters;
-    
-    public AvroWrappedIterable(Iterable<AvroValue<V>> iters) {
-      this.iters = iters;
-    }
-    
-    @Override
-    public Iterator<V> iterator() {
-      return new Iterator<V>() {
-        private final Iterator<AvroValue<V>> it = iters.iterator();
-
-        @Override
-        public boolean hasNext() {
-          return it.hasNext();
-        }
-
-        @Override
-        public V next() {
-          return it.next().datum();
-        }
-
-        @Override
-        public void remove() {
-          it.remove();
-        }  
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
deleted file mode 100644
index 3bcab5c..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/** An {@link RecordReader} for Avro data files. */
-public class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
-
-	private FileReader<T> reader;
-	private long start;
-	private long end;
-	private AvroWrapper<T> key;
-	private NullWritable value;
-	private Schema schema;
-
-	public AvroRecordReader(Schema schema) {
-		this.schema = schema;
-	}
-
-	@Override
-	public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
-			InterruptedException {
-		FileSplit split = (FileSplit) genericSplit;
-		Configuration conf = context.getConfiguration();
-		SeekableInput in = new FsInput(split.getPath(), conf);
-		DatumReader<T> datumReader = null;
-		if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
-		  ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-			datumReader = factory.getReader(schema);
-		} else {
-			datumReader = new SpecificDatumReader<T>(schema);
-		}
-		this.reader = DataFileReader.openReader(in, datumReader);
-		reader.sync(split.getStart()); // sync to start
-		this.start = reader.tell();
-		this.end = split.getStart() + split.getLength();
-	}
-
-	@Override
-	public boolean nextKeyValue() throws IOException, InterruptedException {
-		if (!reader.hasNext() || reader.pastSync(end)) {
-			key = null;
-			value = null;
-			return false;
-		}
-		if (key == null) {
-			key = new AvroWrapper<T>();
-		}
-		if (value == null) {
-			value = NullWritable.get();
-		}
-		key.datum(reader.next(key.datum()));
-		return true;
-	}
-
-	@Override
-	public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
-		return key;
-	}
-
-	@Override
-	public NullWritable getCurrentValue() throws IOException, InterruptedException {
-		return value;
-	}
-
-	@Override
-	public float getProgress() throws IOException {
-		if (end == start) {
-			return 0.0f;
-		} else {
-			return Math.min(1.0f, (getPos() - start) / (float) (end - start));
-		}
-	}
-
-	public long getPos() throws IOException {
-		return reader.tell();
-	}
-
-	@Override
-	public void close() throws IOException {
-		reader.close();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
deleted file mode 100644
index 6d21122..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-/**
- * The implementation of the PTableType interface for Avro-based serialization.
- * 
- */
-public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements
-		PTableType<K, V> {
-
-	private static class PairToAvroPair extends
-			MapFn<Pair, org.apache.avro.mapred.Pair> {
-		private final MapFn keyMapFn;
-		private final MapFn valueMapFn;
-		private final String firstJson;
-		private final String secondJson;
-
-		private String pairSchemaJson;
-		private transient Schema pairSchema;
-
-		public PairToAvroPair(AvroType keyType, AvroType valueType) {
-			this.keyMapFn = keyType.getOutputMapFn();
-			this.firstJson = keyType.getSchema().toString();
-			this.valueMapFn = valueType.getOutputMapFn();
-			this.secondJson = valueType.getSchema().toString();
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			keyMapFn.configure(conf);
-			valueMapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			keyMapFn.setConfigurationForTest(conf);
-			valueMapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			keyMapFn.setContext(getContext());
-			valueMapFn.setContext(getContext());
-			pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
-					new Schema.Parser().parse(firstJson),
-					new Schema.Parser().parse(secondJson)).toString();
-		}
-
-		@Override
-		public org.apache.avro.mapred.Pair map(Pair input) {
-			if (pairSchema == null) {
-				pairSchema = new Schema.Parser().parse(pairSchemaJson);
-			}
-			org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(
-					pairSchema);
-			avroPair.key(keyMapFn.map(input.first()));
-			avroPair.value(valueMapFn.map(input.second()));
-			return avroPair;
-		}
-	}
-
-	private static class IndexedRecordToPair extends MapFn<IndexedRecord, Pair> {
-
-		private final MapFn firstMapFn;
-		private final MapFn secondMapFn;
-
-		public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) {
-			this.firstMapFn = firstMapFn;
-			this.secondMapFn = secondMapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			firstMapFn.configure(conf);
-			secondMapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			firstMapFn.setConfigurationForTest(conf);
-			secondMapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			firstMapFn.setContext(getContext());
-			secondMapFn.setContext(getContext());
-		}
-
-		@Override
-		public Pair map(IndexedRecord input) {
-			return Pair.of(firstMapFn.map(input.get(0)),
-					secondMapFn.map(input.get(1)));
-		}
-	}
-
-	private final AvroType<K> keyType;
-	private final AvroType<V> valueType;
-
-	public AvroTableType(AvroType<K> keyType, AvroType<V> valueType,
-			Class<Pair<K, V>> pairClass) {
-		super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(
-				keyType.getSchema(), valueType.getSchema()),
-				new IndexedRecordToPair(keyType.getInputMapFn(),
-						valueType.getInputMapFn()), new PairToAvroPair(keyType,
-						valueType), keyType, valueType);
-		this.keyType = keyType;
-		this.valueType = valueType;
-	}
-
-	@Override
-	public boolean isSpecific() {
-		return keyType.isSpecific() || valueType.isSpecific();
-	}
-
-	@Override
-	public boolean isGeneric() {
-		return keyType.isGeneric() || valueType.isGeneric();
-	}
-
-	@Override
-	public PType<K> getKeyType() {
-		return keyType;
-	}
-
-	@Override
-	public PType<V> getValueType() {
-		return valueType;
-	}
-
-	@Override
-	public PGroupedTableType<K, V> getGroupedTableType() {
-		return new AvroGroupedTableType<K, V>(this);
-	}
-
-  @Override
-  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
-    return PTables.getDetachedValue(this, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroType.java b/src/main/java/org/apache/crunch/types/avro/AvroType.java
deleted file mode 100644
index d5d22a8..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.avro.AvroFileSourceTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-/**
- * The implementation of the PType interface for Avro-based serialization.
- * 
- */
-public class AvroType<T> implements PType<T> {
-
-	private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
-
-	private final Class<T> typeClass;
-  private final String schemaString;
-  private transient Schema schema;
-	private final MapFn baseInputMapFn;
-	private final MapFn baseOutputMapFn;
-	private final List<PType> subTypes;
-  private AvroDeepCopier<T> deepCopier;
-
-	public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) {
-		this(typeClass, schema, IdentityFn.getInstance(), IdentityFn
-				.getInstance(), ptypes);
-	}
-
-	public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn,
-			MapFn outputMapFn, PType... ptypes) {
-		this.typeClass = typeClass;
-		this.schema = Preconditions.checkNotNull(schema);
-		this.schemaString = schema.toString();
-		this.baseInputMapFn = inputMapFn;
-		this.baseOutputMapFn = outputMapFn;
-		this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return typeClass;
-	}
-
-	@Override
-	public PTypeFamily getFamily() {
-		return AvroTypeFamily.getInstance();
-	}
-
-	@Override
-	public List<PType> getSubTypes() {
-		return subTypes;
-	}
-
-	public Schema getSchema() {
-	  if (schema == null){
-	    schema = new Schema.Parser().parse(schemaString);
-	  }
-		return schema;
-	}
-
-	/**
-	 * Determine if the wrapped type is a specific data avro type.
-	 * 
-	 * @return true if the wrapped type is a specific data type
-	 */
-	public boolean isSpecific() {
-		if (SpecificRecord.class.isAssignableFrom(typeClass)) {
-			return true;
-		}
-		for (PType ptype : subTypes) {
-			if (SpecificRecord.class.isAssignableFrom(ptype.getTypeClass())) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	/**
-	 * Determine if the wrapped type is a generic data avro type.
-	 * 
-	 * @return true if the wrapped type is a generic type
-	 */
-	public boolean isGeneric() {
-		return GenericData.Record.class.equals(typeClass);
-	}
-
-	public MapFn<Object, T> getInputMapFn() {
-		return baseInputMapFn;
-	}
-
-	public MapFn<T, Object> getOutputMapFn() {
-		return baseOutputMapFn;
-	}
-
-	@Override
-	public Converter getConverter() {
-		return AVRO_CONVERTER;
-	}
-
-	@Override
-	public SourceTarget<T> getDefaultFileSource(Path path) {
-		return new AvroFileSourceTarget<T>(path, this);
-	}
-
-  private AvroDeepCopier<T> getDeepCopier() {
-    if (deepCopier == null) {
-      if (isSpecific()) {
-        deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema());
-      } else if (isGeneric()) {
-        deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema());
-      } else {
-        deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema());
-      }
-    }
-    return deepCopier;
-  }
-
-  public T getDetachedValue(T value) {
-    if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) {
-      return getDeepCopier().deepCopy(value);
-    }
-    return value;
-  }
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof AvroType)) {
-			return false;
-		}
-		AvroType at = (AvroType) other;
-		return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes));
-
-	}
-
-	@Override
-	public int hashCode() {
-		HashCodeBuilder hcb = new HashCodeBuilder();
-		hcb.append(typeClass).append(subTypes);
-		return hcb.toHashCode();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
deleted file mode 100644
index b7d5598..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypeUtils;
-
-public class AvroTypeFamily implements PTypeFamily {
-  
-  private static final AvroTypeFamily INSTANCE = new AvroTypeFamily();
-  
-  public static AvroTypeFamily getInstance() {
-    return INSTANCE;
-  }
-  
-  // There can only be one instance.
-  private AvroTypeFamily() {
-  }
-
-  
-  @Override
-  public PType<Void> nulls() {
-    return Avros.nulls();
-  }
-
-  @Override
-  public PType<String> strings() {
-    return Avros.strings();
-  }
-
-  @Override
-  public PType<Long> longs() {
-    return Avros.longs();
-  }
-
-  @Override
-  public PType<Integer> ints() {
-    return Avros.ints();
-  }
-
-  @Override
-  public PType<Float> floats() {
-    return Avros.floats();
-  }
-
-  @Override
-  public PType<Double> doubles() {
-    return Avros.doubles();
-  }
-
-  @Override
-  public PType<Boolean> booleans() {
-    return Avros.booleans();
-  }
-
-  @Override
-  public PType<ByteBuffer> bytes() {
-    return Avros.bytes();
-  }
-
-  @Override
-  public <T> PType<T> records(Class<T> clazz) {
-    return Avros.records(clazz);
-  }
-
-  public PType<GenericData.Record> generics(Schema schema) {
-	return Avros.generics(schema);
-  }
-  
-  public <T> PType<T> containers(Class<T> clazz) {
-    return Avros.containers(clazz);
-  }
-  
-  @Override
-  public <T> PType<Collection<T>> collections(PType<T> ptype) {
-    return Avros.collections(ptype);
-  }
-
-  @Override
-  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
-	return Avros.maps(ptype);
-  }
-  
-  @Override
-  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
-    return Avros.pairs(p1, p2);
-  }
-
-  @Override
-  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
-    return Avros.triples(p1, p2, p3);
-  }
-
-  @Override
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-    return Avros.quads(p1, p2, p3, p4);
-  }
-
-  @Override
-  public PType<TupleN> tuples(PType<?>... ptypes) {
-    return Avros.tuples(ptypes);
-  }
-
-  @Override
-  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
-    return Avros.tableOf(key, value);
-  }
-
-  @Override
-  public <T> PType<T> as(PType<T> ptype) {
-    if (ptype instanceof AvroType || ptype instanceof AvroGroupedTableType) {
-      return ptype;
-    }
-    if (ptype instanceof PGroupedTableType) {
-      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
-      return new AvroGroupedTableType((AvroTableType) as(ptt));
-    }
-    Class<T> typeClass = ptype.getTypeClass();
-    PType<T> prim = Avros.getPrimitiveType(typeClass);
-    if (prim != null) {
-      return prim;
-    }
-    return PTypeUtils.convert(ptype, this);
-  }
-
-  @Override
-  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
-    return Avros.tuples(clazz, ptypes);
-  }
-
-  @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
-    return Avros.derived(clazz, inputFn, outputFn, base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
deleted file mode 100644
index 207fe8d..0000000
--- a/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
-
-/**
- * An {@link org.apache.hadoop.mapred.InputFormat} for text files.
- * Each line is a {@link Utf8} key; values are null.
- */
-public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, NullWritable> {
-
-  static class Utf8LineRecordReader extends RecordReader<AvroWrapper<Utf8>, NullWritable> {
-
-    private LineRecordReader lineRecordReader;
-
-    private AvroWrapper<Utf8> currentKey = new AvroWrapper<Utf8>();
-    
-    public Utf8LineRecordReader() throws IOException {
-      this.lineRecordReader = new LineRecordReader();
-    }
-
-    public void close() throws IOException {
-      lineRecordReader.close();
-    }
-
-    public float getProgress() throws IOException {
-      return lineRecordReader.getProgress();
-    }
-
-    @Override
-    public AvroWrapper<Utf8> getCurrentKey() throws IOException,
-        InterruptedException {
-      Text txt = lineRecordReader.getCurrentValue();
-      currentKey.datum(new Utf8(txt.toString()));
-      return currentKey;
-    }
-
-    @Override
-    public NullWritable getCurrentValue() throws IOException,
-        InterruptedException {
-      return NullWritable.get();
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      lineRecordReader.initialize(split, context);
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      return lineRecordReader.nextKeyValue();
-    }
-  }
-
-  private CompressionCodecFactory compressionCodecs = null;
-
-  public void configure(Configuration conf) {
-    compressionCodecs = new CompressionCodecFactory(conf);
-  }
-
-  protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
-  }
-
-  @Override
-  public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    return new Utf8LineRecordReader();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/Avros.java b/src/main/java/org/apache/crunch/types/avro/Avros.java
deleted file mode 100644
index fbc6bf3..0000000
--- a/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.util.PTypes;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Defines static methods that are analogous to the methods defined in
- * {@link AvroTypeFamily} for convenient static importing.
- * 
- */
-public class Avros {
-
-	/**
-	 * The instance we use for generating reflected schemas. May be modified by
-	 * clients (e.g., Scrunch.)
-	 */
-	public static ReflectDataFactory REFLECT_DATA_FACTORY = new ReflectDataFactory();
-
-	/**
-	 * The name of the configuration parameter that tracks which reflection
-	 * factory to use.
-	 */
-	public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
-
-	public static void configureReflectDataFactory(Configuration conf) {
-		conf.setClass(REFLECT_DATA_FACTORY_CLASS,
-				REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
-	}
-
-	public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
-		return (ReflectDataFactory) ReflectionUtils.newInstance(conf.getClass(
-				REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf);
-	}
-
-	public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() {
-		@Override
-		public String map(CharSequence input) {
-			return input.toString();
-		}
-	};
-
-	public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() {
-		@Override
-		public Utf8 map(String input) {
-			return new Utf8(input);
-		}
-	};
-
-	public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() {
-		@Override
-		public ByteBuffer map(Object input) {
-			if (input instanceof ByteBuffer) {
-				return (ByteBuffer) input;
-			}
-			return ByteBuffer.wrap((byte[]) input);
-		}
-	};
-
-	private static final AvroType<String> strings = new AvroType<String>(
-			String.class, Schema.create(Schema.Type.STRING), UTF8_TO_STRING,
-			STRING_TO_UTF8);
-	private static final AvroType<Void> nulls = create(Void.class,
-			Schema.Type.NULL);
-	private static final AvroType<Long> longs = create(Long.class,
-			Schema.Type.LONG);
-	private static final AvroType<Integer> ints = create(Integer.class,
-			Schema.Type.INT);
-	private static final AvroType<Float> floats = create(Float.class,
-			Schema.Type.FLOAT);
-	private static final AvroType<Double> doubles = create(Double.class,
-			Schema.Type.DOUBLE);
-	private static final AvroType<Boolean> booleans = create(Boolean.class,
-			Schema.Type.BOOLEAN);
-	private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(
-			ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN,
-			IdentityFn.getInstance());
-
-	private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
-			.<Class<?>, PType<?>> builder().put(String.class, strings)
-			.put(Long.class, longs).put(Integer.class, ints)
-			.put(Float.class, floats).put(Double.class, doubles)
-			.put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
-
-	private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps
-			.newHashMap();
-
-	public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
-		EXTENSIONS.put(clazz, ptype);
-	}
-
-	public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
-		return (PType<T>) PRIMITIVES.get(clazz);
-	}
-
-  static <T> boolean isPrimitive(AvroType<T> avroType) {
-    return PRIMITIVES.containsKey(avroType.getTypeClass());
-  }
-
-	private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) {
-		return new AvroType<T>(clazz, Schema.create(schemaType));
-	}
-
-	public static final AvroType<Void> nulls() {
-		return nulls;
-	}
-
-	public static final AvroType<String> strings() {
-		return strings;
-	}
-
-	public static final AvroType<Long> longs() {
-		return longs;
-	}
-
-	public static final AvroType<Integer> ints() {
-		return ints;
-	}
-
-	public static final AvroType<Float> floats() {
-		return floats;
-	}
-
-	public static final AvroType<Double> doubles() {
-		return doubles;
-	}
-
-	public static final AvroType<Boolean> booleans() {
-		return booleans;
-	}
-
-	public static final AvroType<ByteBuffer> bytes() {
-		return bytes;
-	}
-
-	public static final <T> AvroType<T> records(Class<T> clazz) {
-		if (EXTENSIONS.containsKey(clazz)) {
-			return (AvroType<T>) EXTENSIONS.get(clazz);
-		}
-		return containers(clazz);
-	}
-
-	public static final AvroType<GenericData.Record> generics(Schema schema) {
-		return new AvroType<GenericData.Record>(GenericData.Record.class,
-				schema);
-	}
-
-	public static final <T> AvroType<T> containers(Class<T> clazz) {
-		return reflects(clazz);
-	}
-
-	public static final <T> AvroType<T> reflects(Class<T> clazz) {
-		return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData()
-				.getSchema(clazz));
-	}
-
-	private static class BytesToWritableMapFn<T extends Writable> extends
-			MapFn<ByteBuffer, T> {
-		private static final Log LOG = LogFactory
-				.getLog(BytesToWritableMapFn.class);
-
-		private final Class<T> writableClazz;
-
-		public BytesToWritableMapFn(Class<T> writableClazz) {
-			this.writableClazz = writableClazz;
-		}
-
-		@Override
-		public T map(ByteBuffer input) {
-			T instance = ReflectionUtils.newInstance(writableClazz,
-					getConfiguration());
-			try {
-				instance.readFields(new DataInputStream(
-						new ByteArrayInputStream(input.array(), input
-								.arrayOffset(), input.limit())));
-			} catch (IOException e) {
-				LOG.error("Exception thrown reading instance of: "
-						+ writableClazz, e);
-			}
-			return instance;
-		}
-	}
-
-	private static class WritableToBytesMapFn<T extends Writable> extends
-			MapFn<T, ByteBuffer> {
-		private static final Log LOG = LogFactory
-				.getLog(WritableToBytesMapFn.class);
-
-		@Override
-		public ByteBuffer map(T input) {
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			DataOutputStream das = new DataOutputStream(baos);
-			try {
-				input.write(das);
-			} catch (IOException e) {
-				LOG.error("Exception thrown converting Writable to bytes", e);
-			}
-			return ByteBuffer.wrap(baos.toByteArray());
-		}
-	}
-
-	public static final <T extends Writable> AvroType<T> writables(
-			Class<T> clazz) {
-		return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES),
-				new BytesToWritableMapFn<T>(clazz),
-				new WritableToBytesMapFn<T>());
-	}
-
-	private static class GenericDataArrayToCollection<T> extends
-			MapFn<Object, Collection<T>> {
-
-		private final MapFn<Object, T> mapFn;
-
-		public GenericDataArrayToCollection(MapFn<Object, T> mapFn) {
-			this.mapFn = mapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public Collection<T> map(Object input) {
-			Collection<T> ret = Lists.newArrayList();
-			if (input instanceof Collection) {
-				for (Object in : (Collection<Object>) input) {
-					ret.add(mapFn.map(in));
-				}
-			} else {
-				// Assume it is an array
-				Object[] arr = (Object[]) input;
-				for (Object in : arr) {
-					ret.add(mapFn.map(in));
-				}
-			}
-			return ret;
-		}
-	}
-
-	private static class CollectionToGenericDataArray extends
-			MapFn<Collection<?>, GenericData.Array<?>> {
-
-		private final MapFn mapFn;
-		private final String jsonSchema;
-		private transient Schema schema;
-
-		public CollectionToGenericDataArray(Schema schema, MapFn mapFn) {
-			this.mapFn = mapFn;
-			this.jsonSchema = schema.toString();
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public GenericData.Array<?> map(Collection<?> input) {
-			if (schema == null) {
-				schema = new Schema.Parser().parse(jsonSchema);
-			}
-			GenericData.Array array = new GenericData.Array(input.size(),
-					schema);
-			for (Object in : input) {
-				array.add(mapFn.map(in));
-			}
-			return array;
-		}
-	}
-
-	public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
-		AvroType<T> avroType = (AvroType<T>) ptype;
-		Schema collectionSchema = Schema.createArray(allowNulls(avroType
-				.getSchema()));
-		GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
-				avroType.getInputMapFn());
-		CollectionToGenericDataArray output = new CollectionToGenericDataArray(
-				collectionSchema, avroType.getOutputMapFn());
-		return new AvroType(Collection.class, collectionSchema, input, output,
-				ptype);
-	}
-
-	private static class AvroMapToMap<T> extends
-			MapFn<Map<CharSequence, Object>, Map<String, T>> {
-		private final MapFn<Object, T> mapFn;
-
-		public AvroMapToMap(MapFn<Object, T> mapFn) {
-			this.mapFn = mapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public Map<String, T> map(Map<CharSequence, Object> input) {
-			Map<String, T> out = Maps.newHashMap();
-			for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
-				out.put(e.getKey().toString(), mapFn.map(e.getValue()));
-			}
-			return out;
-		}
-	}
-
-	private static class MapToAvroMap<T> extends
-			MapFn<Map<String, T>, Map<Utf8, Object>> {
-		private final MapFn<T, Object> mapFn;
-
-		public MapToAvroMap(MapFn<T, Object> mapFn) {
-			this.mapFn = mapFn;
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			mapFn.configure(conf);
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			mapFn.setConfigurationForTest(conf);
-		}
-
-		@Override
-		public void initialize() {
-			this.mapFn.setContext(getContext());
-		}
-
-		@Override
-		public Map<Utf8, Object> map(Map<String, T> input) {
-			Map<Utf8, Object> out = Maps.newHashMap();
-			for (Map.Entry<String, T> e : input.entrySet()) {
-				out.put(new Utf8(e.getKey()), mapFn.map(e.getValue()));
-			}
-			return out;
-		}
-	}
-
-	public static final <T> AvroType<Map<String, T>> maps(PType<T> ptype) {
-		AvroType<T> avroType = (AvroType<T>) ptype;
-		Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
-		AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
-		MapToAvroMap<T> outputFn = new MapToAvroMap<T>(
-				avroType.getOutputMapFn());
-		return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype);
-	}
-
-	private static class GenericRecordToTuple extends
-			MapFn<GenericRecord, Tuple> {
-		private final TupleFactory<?> tupleFactory;
-		private final List<MapFn> fns;
-
-		private transient Object[] values;
-
-		public GenericRecordToTuple(TupleFactory<?> tupleFactory,
-				PType<?>... ptypes) {
-			this.tupleFactory = tupleFactory;
-			this.fns = Lists.newArrayList();
-			for (PType<?> ptype : ptypes) {
-				AvroType atype = (AvroType) ptype;
-				fns.add(atype.getInputMapFn());
-			}
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.configure(conf);
-			}
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.setConfigurationForTest(conf);
-			}
-		}
-
-		@Override
-		public void initialize() {
-			for (MapFn fn : fns) {
-				fn.setContext(getContext());
-			}
-			this.values = new Object[fns.size()];
-			tupleFactory.initialize();
-		}
-
-		@Override
-		public Tuple map(GenericRecord input) {
-			for (int i = 0; i < values.length; i++) {
-				Object v = input.get(i);
-				if (v == null) {
-					values[i] = null;
-				} else {
-					values[i] = fns.get(i).map(v);
-				}
-			}
-			return tupleFactory.makeTuple(values);
-		}
-	}
-
-	private static class TupleToGenericRecord extends
-			MapFn<Tuple, GenericRecord> {
-		private final List<MapFn> fns;
-		private final String jsonSchema;
-
-		private transient GenericRecord record;
-
-		public TupleToGenericRecord(Schema schema, PType<?>... ptypes) {
-			this.fns = Lists.newArrayList();
-			this.jsonSchema = schema.toString();
-			for (PType ptype : ptypes) {
-				AvroType atype = (AvroType) ptype;
-				fns.add(atype.getOutputMapFn());
-			}
-		}
-
-		@Override
-		public void configure(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.configure(conf);
-			}
-		}
-
-		@Override
-		public void setConfigurationForTest(Configuration conf) {
-			for (MapFn fn : fns) {
-				fn.setConfigurationForTest(conf);
-			}
-		}
-
-		@Override
-		public void initialize() {
-			this.record = new GenericData.Record(
-					new Schema.Parser().parse(jsonSchema));
-			for (MapFn fn : fns) {
-				fn.setContext(getContext());
-			}
-		}
-
-		@Override
-		public GenericRecord map(Tuple input) {
-			for (int i = 0; i < input.size(); i++) {
-				Object v = input.get(i);
-				if (v == null) {
-					record.put(i, null);
-				} else {
-					record.put(i, fns.get(i).map(v));
-				}
-			}
-			return record;
-		}
-	}
-
-	public static final <V1, V2> AvroType<Pair<V1, V2>> pairs(PType<V1> p1,
-			PType<V2> p2) {
-		Schema schema = createTupleSchema(p1, p2);
-		GenericRecordToTuple input = new GenericRecordToTuple(
-				TupleFactory.PAIR, p1, p2);
-		TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-		return new AvroType(Pair.class, schema, input, output, p1, p2);
-	}
-
-	public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(
-			PType<V1> p1, PType<V2> p2, PType<V3> p3) {
-		Schema schema = createTupleSchema(p1, p2, p3);
-		return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(
-				TupleFactory.TUPLE3, p1, p2, p3), new TupleToGenericRecord(
-				schema, p1, p2, p3), p1, p2, p3);
-	}
-
-	public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(
-			PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-		Schema schema = createTupleSchema(p1, p2, p3, p4);
-		return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(
-				TupleFactory.TUPLE4, p1, p2, p3, p4), new TupleToGenericRecord(
-				schema, p1, p2, p3, p4), p1, p2, p3, p4);
-	}
-
-	public static final AvroType<TupleN> tuples(PType... ptypes) {
-		Schema schema = createTupleSchema(ptypes);
-		return new AvroType(TupleN.class, schema, new GenericRecordToTuple(
-				TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
-				ptypes), ptypes);
-	}
-
-	public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz,
-			PType... ptypes) {
-		Schema schema = createTupleSchema(ptypes);
-		Class[] typeArgs = new Class[ptypes.length];
-		for (int i = 0; i < typeArgs.length; i++) {
-			typeArgs[i] = ptypes[i].getTypeClass();
-		}
-		TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-		return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory,
-				ptypes), new TupleToGenericRecord(schema, ptypes), ptypes);
-	}
-
-	private static Schema createTupleSchema(PType<?>... ptypes) {
-		// Guarantee each tuple schema has a globally unique name
-		String tupleName = "tuple"
-				+ UUID.randomUUID().toString().replace('-', 'x');
-		Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-		List<Schema.Field> fields = Lists.newArrayList();
-		for (int i = 0; i < ptypes.length; i++) {
-			AvroType atype = (AvroType) ptypes[i];
-			Schema fieldSchema = allowNulls(atype.getSchema());
-			fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
-		}
-		schema.setFields(fields);
-		return schema;
-	}
-
-	public static final <S, T> AvroType<T> derived(Class<T> clazz,
-			MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
-		AvroType<S> abase = (AvroType<S>) base;
-		return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(
-				abase.getInputMapFn(), inputFn), new CompositeMapFn(outputFn,
-				abase.getOutputMapFn()), base.getSubTypes().toArray(
-				new PType[0]));
-	}
-
-	public static <T> PType<T> jsons(Class<T> clazz) {
-		return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
-	}
-
-	public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key,
-			PType<V> value) {
-	  if (key instanceof PTableType) {
-	    PTableType ptt = (PTableType) key;
-	    key = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
-	  }
-	  if (value instanceof PTableType) {
-	    PTableType ptt = (PTableType) value;
-	    value = Avros.pairs(ptt.getKeyType(), ptt.getValueType());
-	  }
-		AvroType<K> avroKey = (AvroType<K>) key;
-		AvroType<V> avroValue = (AvroType<V>) value;
-		return new AvroTableType(avroKey, avroValue, Pair.class);
-	}
-
-	private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
-
-	private static Schema allowNulls(Schema base) {
-		if (NULL_SCHEMA.equals(base)) {
-			return base;
-		}
-		return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA));
-	}
-
-	private Avros() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
deleted file mode 100644
index 7c952ec..0000000
--- a/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-/**
- * A Factory class for constructing Avro reflection-related objects.
- */
-public class ReflectDataFactory {
-
-  public ReflectData getReflectData() { return ReflectData.AllowNull.get(); }
-  
-  public <T> ReflectDatumReader<T> getReader(Schema schema) {
-    return new ReflectDatumReader<T>(schema);
-  }
-  
-  public <T> ReflectDatumWriter<T> getWriter() {
-    return new ReflectDatumWriter<T>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
deleted file mode 100644
index b5f27fe..0000000
--- a/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.avro;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.Pair;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
-public class SafeAvroSerialization<T> extends Configured 
-  implements Serialization<AvroWrapper<T>> {
-
-  public boolean accept(Class<?> c) {
-    return AvroWrapper.class.isAssignableFrom(c);
-  }
-  
-  /** Returns the specified map output deserializer.  Defaults to the final
-   * output deserializer if no map output schema was specified. */
-  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
-    boolean isKey = AvroKey.class.isAssignableFrom(c);
-    Configuration conf = getConf();
-    Schema schema = isKey 
-        ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
-            : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf));
-
-    DatumReader<T> datumReader = null;
-    if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {        
-        ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
-            conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
-        datumReader = factory.getReader(schema);
-    } else {
-        datumReader = new SpecificDatumReader<T>(schema);
-    }
-    return new AvroWrapperDeserializer(datumReader, isKey);
-  }
-  
-  private static final DecoderFactory FACTORY = DecoderFactory.get();
-
-  private class AvroWrapperDeserializer
-    implements Deserializer<AvroWrapper<T>> {
-
-    private DatumReader<T> reader;
-    private BinaryDecoder decoder;
-    private boolean isKey;
-    
-    public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
-      this.reader = reader;
-      this.isKey = isKey;
-    }
-    
-    public void open(InputStream in) {
-      this.decoder = FACTORY.directBinaryDecoder(in, decoder);
-    }
-    
-    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
-      throws IOException {
-      T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
-      if (wrapper == null) {
-        wrapper = isKey? new AvroKey<T>(datum) : new AvroValue<T>(datum);
-      } else {
-        wrapper.datum(datum);
-      }
-      return wrapper;
-    }
-
-    public void close() throws IOException {
-      decoder.inputStream().close();
-    }
-  }
-  
-  /** Returns the specified output serializer. */
-  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
-    // AvroWrapper used for final output, AvroKey or AvroValue for map output
-    boolean isFinalOutput = c.equals(AvroWrapper.class);
-    Configuration conf = getConf();
-    Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf)
-        : (AvroKey.class.isAssignableFrom(c)
-            ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
-                : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
-
-    ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    ReflectDatumWriter<T> writer = factory.getWriter();
-    writer.setSchema(schema);
-    return new AvroWrapperSerializer(writer);
-  }
-
-  private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {
-    private DatumWriter<T> writer;
-    private OutputStream out;
-    private BinaryEncoder encoder;
-    
-    public AvroWrapperSerializer(DatumWriter<T> writer) {
-      this.writer = writer;
-    }
-
-    public void open(OutputStream out) {
-      this.out = out;
-      this.encoder = new EncoderFactory().configureBlockSize(512)
-          .binaryEncoder(out, null);
-    }
-
-    public void serialize(AvroWrapper<T> wrapper) throws IOException {
-      writer.write(wrapper.datum(), encoder);
-      // would be a lot faster if the Serializer interface had a flush()
-      // method and the Hadoop framework called it when needed rather
-      // than for every record.
-      encoder.flush();
-    }
-
-    public void close() throws IOException {
-      out.close();
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
deleted file mode 100644
index 5c4d83f..0000000
--- a/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableUtils;
-
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-
-public class GenericArrayWritable<T> implements Writable {
-  private Writable[] values;
-  private Class<? extends Writable> valueClass;
-
-  public GenericArrayWritable(Class<? extends Writable> valueClass) {
-    this.valueClass = valueClass;
-  }
-  
-  public GenericArrayWritable() {
-    // for deserialization
-  }
-  
-  public void set(Writable[] values) { 
-    this.values = values; 
-  }
-
-  public Writable[] get() {
-    return values;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    values = new Writable[WritableUtils.readVInt(in)];          // construct values
-    if (values.length > 0) {
-      int nulls = WritableUtils.readVInt(in);
-      if (nulls == values.length) {
-        return;
-      }
-      String valueType = Text.readString(in);
-      setValueType(valueType);
-      for (int i = 0; i < values.length; i++) {
-        Writable value = WritableFactories.newInstance(valueClass);
-        value.readFields(in);                       // read a value
-        values[i] = value;                          // store it in values
-      }
-    }
-  }
-  
-  protected void setValueType(String valueType) {
-    if (valueClass == null) {
-      try {
-        valueClass = Class.forName(valueType).asSubclass(Writable.class);      
-      } catch (ClassNotFoundException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    } else if (!valueType.equals(valueClass.getName()))  {
-      throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
-    }
-  }
-  
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, values.length);
-    int nulls = 0;
-    for (int i = 0; i < values.length; i++) {
-      if (values[i] == null) {
-        nulls++;
-      }
-    }
-    WritableUtils.writeVInt(out, nulls);
-    if (values.length - nulls > 0) {
-      if (valueClass == null) {
-        throw new IllegalStateException("Value class not set by constructor or read");
-      }
-      Text.writeString(out, valueClass.getName());
-      for (int i = 0; i < values.length; i++) {
-        if (values[i] != null) {
-          values[i].write(out);
-        }
-      }
-    }
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(values).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    GenericArrayWritable other = (GenericArrayWritable) obj;
-    if (!Arrays.equals(values, other.values))
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java b/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
deleted file mode 100644
index 303fc41..0000000
--- a/src/main/java/org/apache/crunch/types/writable/TextMapWritable.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.collect.Maps;
-
-public class TextMapWritable<T extends Writable> implements Writable {
-
-  private Class<T> valueClazz;
-  private final Map<Text, T> instance;
-
-  public TextMapWritable() {
-    this.instance = Maps.newHashMap();
-  }
-
-  public TextMapWritable(Class<T> valueClazz) {
-    this.valueClazz = valueClazz;
-    this.instance = Maps.newHashMap();
-  }
-
-  public void put(Text txt, T value) {
-    instance.put(txt, value);
-  }
-
-  public Set<Map.Entry<Text, T>> entrySet() {
-    return instance.entrySet();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    instance.clear();
-    try {
-      this.valueClazz = (Class<T>) Class.forName(Text.readString(in));
-    } catch (ClassNotFoundException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    }
-    int entries = WritableUtils.readVInt(in);
-    try {
-      for (int i = 0; i < entries; i++) {
-        Text txt = new Text();
-        txt.readFields(in);
-        T value = valueClazz.newInstance();
-        value.readFields(in);
-        instance.put(txt, value);
-      }
-    } catch (IllegalAccessException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException) new IOException("Failed map init").initCause(e);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, valueClazz.getName());
-    WritableUtils.writeVInt(out, instance.size());
-    for (Map.Entry<Text, T> e : instance.entrySet()) {
-      e.getKey().write(out);
-      e.getValue().write(out);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
deleted file mode 100644
index ee4e80b..0000000
--- a/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * A straight copy of the TupleWritable implementation in the join package,
- * added here because of its package visibility restrictions.
- * 
- */
-public class TupleWritable implements WritableComparable<TupleWritable> {
-
-  private long written;
-  private Writable[] values;
-
-  /**
-   * Create an empty tuple with no allocated storage for writables.
-   */
-  public TupleWritable() {
-  }
-
-  /**
-   * Initialize tuple with storage; unknown whether any of them contain
-   * &quot;written&quot; values.
-   */
-  public TupleWritable(Writable[] vals) {
-    written = 0L;
-    values = vals;
-  }
-
-  /**
-   * Return true if tuple has an element at the position provided.
-   */
-  public boolean has(int i) {
-    return 0 != ((1 << i) & written);
-  }
-
-  /**
-   * Get ith Writable from Tuple.
-   */
-  public Writable get(int i) {
-    return values[i];
-  }
-
-  /**
-   * The number of children in this Tuple.
-   */
-  public int size() {
-    return values.length;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public boolean equals(Object other) {
-    if (other instanceof TupleWritable) {
-      TupleWritable that = (TupleWritable) other;
-      if (this.size() != that.size() || this.written != that.written) {
-        return false;
-      }
-      for (int i = 0; i < values.length; ++i) {
-        if (!has(i))
-          continue;
-        if (!values[i].equals(that.get(i))) {
-          return false;
-        }
-      }
-      return true;
-    }
-    return false;
-  }
-
-  public int hashCode() {
-    HashCodeBuilder builder = new HashCodeBuilder();
-    builder.append(written);
-    for (Writable v : values) {
-      builder.append(v);
-    }
-    return builder.toHashCode();
-  }
-
-  /**
-   * Convert Tuple to String as in the following.
-   * <tt>[<child1>,<child2>,...,<childn>]</tt>
-   */
-  public String toString() {
-    StringBuffer buf = new StringBuffer("[");
-    for (int i = 0; i < values.length; ++i) {
-      buf.append(has(i) ? values[i].toString() : "");
-      buf.append(",");
-    }
-    if (values.length != 0)
-      buf.setCharAt(buf.length() - 1, ']');
-    else
-      buf.append(']');
-    return buf.toString();
-  }
-
-  /**
-   * Writes each Writable to <code>out</code>. TupleWritable format:
-   * {@code
-   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
-   * }
-   */
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, values.length);
-    WritableUtils.writeVLong(out, written);
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
-        Text.writeString(out, values[i].getClass().getName());
-      }
-    }
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
-        values[i].write(out);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @SuppressWarnings("unchecked")
-  // No static typeinfo on Tuples
-  public void readFields(DataInput in) throws IOException {
-    int card = WritableUtils.readVInt(in);
-    values = new Writable[card];
-    written = WritableUtils.readVLong(in);
-    Class<? extends Writable>[] cls = new Class[card];
-    try {
-      for (int i = 0; i < card; ++i) {
-        if (has(i)) {
-          cls[i] = Class.forName(Text.readString(in))
-              .asSubclass(Writable.class);
-        }
-      }
-      for (int i = 0; i < card; ++i) {
-        if (has(i)) {
-          values[i] = cls[i].newInstance();
-          values[i].readFields(in);
-        }
-      }
-    } catch (ClassNotFoundException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    } catch (IllegalAccessException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException) new IOException("Failed tuple init").initCause(e);
-    }
-  }
-
-  /**
-   * Record that the tuple contains an element at the position provided.
-   */
-  public void setWritten(int i) {
-    written |= 1 << i;
-  }
-
-  /**
-   * Record that the tuple does not contain an element at the position provided.
-   */
-  public void clearWritten(int i) {
-    written &= -1 ^ (1 << i);
-  }
-
-  /**
-   * Clear any record of which writables have been written to, without releasing
-   * storage.
-   */
-  public void clearWritten() {
-    written = 0L;
-  }
-
-  @Override
-  public int compareTo(TupleWritable o) {
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i) && !o.has(i)) {
-        return 1;
-      } else if (!has(i) && o.has(i)) {
-        return -1;
-      } else {
-        Writable v1 = values[i];
-        Writable v2 = o.values[i];
-        if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-          if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-            int cmp = ((WritableComparable)v1).compareTo((WritableComparable)v2);
-            if (cmp != 0) {
-              return cmp;
-            }
-          } else {
-            int cmp = v1.hashCode() - v2.hashCode();
-            if (cmp != 0) {
-              return cmp;
-            }
-          }
-        }
-      }
-    }
-    return values.length - o.values.length;
-  }
-}
\ No newline at end of file


Mime
View raw message