incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [16/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
new file mode 100644
index 0000000..9c781f1
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.ImmutableList;
+
+class WritableTableType<K, V> implements PTableType<K, V> {
+
+  private final WritableType<K, Writable> keyType;
+  private final WritableType<V, Writable> valueType;
+  private final MapFn inputFn;
+  private final MapFn outputFn;
+  private final Converter converter;
+  
+  public WritableTableType(WritableType<K, Writable> keyType,
+      WritableType<V, Writable> valueType) {
+    this.keyType = keyType;
+    this.valueType = valueType;
+    this.inputFn = new PairMapFn(keyType.getInputMapFn(),
+        valueType.getInputMapFn());
+    this.outputFn = new PairMapFn(keyType.getOutputMapFn(),
+        valueType.getOutputMapFn());
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
+        valueType.getSerializationClass());
+  }
+
+  @Override
+  public Class<Pair<K, V>> getTypeClass() {
+    return (Class<Pair<K, V>>) Pair.of(null, null).getClass();
+  }
+  
+  @Override
+  public List<PType> getSubTypes() {
+    return ImmutableList.<PType>of(keyType, valueType);
+  }
+  
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+  
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+  
+  @Override
+  public Converter getConverter() {
+    return converter;
+  }
+  
+  @Override
+  public PTypeFamily getFamily() {
+    return WritableTypeFamily.getInstance();
+  }
+
+  public PType<K> getKeyType() {
+    return keyType;
+  }
+
+  public PType<V> getValueType() {
+    return valueType;
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return new WritableGroupedTableType<K, V>(this);
+  }
+
+  @Override
+  public SourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
+    return new SeqFileTableSourceTarget<K, V>(path, this);
+  }
+  
+  @Override
+  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
+    return PTables.getDetachedValue(this, value);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+	if (obj == null || !(obj instanceof WritableTableType)) {
+	  return false;
+	}
+	WritableTableType that = (WritableTableType) obj;
+	return keyType.equals(that.keyType) && valueType.equals(that.valueType);
+  }
+  
+  @Override
+  public int hashCode() {
+	HashCodeBuilder hcb = new HashCodeBuilder();
+	return hcb.append(keyType).append(valueType).toHashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableType.java b/src/main/java/org/apache/crunch/types/writable/WritableType.java
new file mode 100644
index 0000000..b99050d
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.ImmutableList;
+
+public class WritableType<T, W extends Writable> implements PType<T> {
+
+  private final Class<T> typeClass;
+  private final Class<W> writableClass;
+  private final Converter converter;
+  private final MapFn<W, T> inputFn;
+  private final MapFn<T, W> outputFn;
+  private final List<PType> subTypes;
+  
+  WritableType(Class<T> typeClass, Class<W> writableClass,
+      MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn, PType...subTypes) {
+    this.typeClass = typeClass;
+    this.writableClass = writableClass;
+    this.inputFn = inputDoFn;
+    this.outputFn = outputDoFn;
+    this.converter = new WritableValueConverter(writableClass);
+    this.subTypes = ImmutableList.<PType>builder().add(subTypes).build();
+  }
+
+  @Override
+  public PTypeFamily getFamily() {
+    return WritableTypeFamily.getInstance();
+  }
+
+  @Override
+  public Class<T> getTypeClass() {
+    return typeClass;
+  }
+
+  @Override
+  public Converter getConverter() {
+    return converter;
+  }
+  
+  @Override
+  public MapFn getInputMapFn() {
+    return inputFn;
+  }
+  
+  @Override
+  public MapFn getOutputMapFn() {
+    return outputFn;
+  }
+  
+  @Override
+  public List<PType> getSubTypes() {
+    return subTypes;
+  }
+  
+  public Class<W> getSerializationClass() {
+    return writableClass;
+  }
+
+  @Override
+  public SourceTarget<T> getDefaultFileSource(Path path) {
+    return new SeqFileSourceTarget<T>(path, this);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+	if (obj == null || !(obj instanceof WritableType)) {
+	  return false;
+	}
+	WritableType wt = (WritableType) obj;
+	return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) &&	
+		subTypes.equals(wt.subTypes));
+  }
+  
+  // Unchecked warnings are suppressed because we know that W and T are the same
+  // type (due to the IdentityFn being used)
+  @SuppressWarnings("unchecked")
+  @Override
+  public T getDetachedValue(T value) {
+    if (this.inputFn.getClass().equals(IdentityFn.class)) {
+      W writableValue = (W) value;
+      return (T) Writables.deepCopy(writableValue, this.writableClass);
+    } else {
+      return value;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+	HashCodeBuilder hcb = new HashCodeBuilder();
+	hcb.append(typeClass).append(writableClass).append(subTypes);
+	return hcb.toHashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
new file mode 100644
index 0000000..6ff33e9
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypeUtils;
+
+/**
+ * The {@link Writable}-based implementation of the {@link org.apache.crunch.types.PTypeFamily}
+ * interface.
+ */
+public class WritableTypeFamily implements PTypeFamily {
+
+  private static final WritableTypeFamily INSTANCE = new WritableTypeFamily();
+
+  public static WritableTypeFamily getInstance() {
+    return INSTANCE;
+  }
+
+  // Disallow construction
+  private WritableTypeFamily() {
+  }
+
+  public PType<Void> nulls() {
+    return Writables.nulls();
+  }
+
+  public PType<String> strings() {
+    return Writables.strings();
+  }
+
+  public PType<Long> longs() {
+    return Writables.longs();
+  }
+
+  public PType<Integer> ints() {
+    return Writables.ints();
+  }
+
+  public PType<Float> floats() {
+    return Writables.floats();
+  }
+
+  public PType<Double> doubles() {
+    return Writables.doubles();
+  }
+
+  public PType<Boolean> booleans() {
+    return Writables.booleans();
+  }
+  
+  public PType<ByteBuffer> bytes() {
+    return Writables.bytes();
+  }
+  
+  public <T> PType<T> records(Class<T> clazz) {
+    return Writables.records(clazz);
+  }
+
+  public <W extends Writable> PType<W> writables(Class<W> clazz) {
+    return Writables.writables(clazz);
+  }
+
+  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    return Writables.tableOf(key, value);
+  }
+
+  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
+    return Writables.pairs(p1, p2);
+  }
+
+  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3) {
+    return Writables.triples(p1, p2, p3);
+  }
+
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+    return Writables.quads(p1, p2, p3, p4);
+  }
+
+  public PType<TupleN> tuples(PType<?>... ptypes) {
+    return Writables.tuples(ptypes);
+  }
+
+  public <T> PType<Collection<T>> collections(PType<T> ptype) {
+    return Writables.collections(ptype);
+  }
+
+  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
+	return Writables.maps(ptype);
+  }
+  
+  @Override
+  public <T> PType<T> as(PType<T> ptype) {
+    if (ptype instanceof WritableType || ptype instanceof WritableTableType ||
+        ptype instanceof WritableGroupedTableType) {
+      return ptype;
+    }
+    if (ptype instanceof PGroupedTableType) {
+      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
+      return new WritableGroupedTableType((WritableTableType) as(ptt));
+    }
+    PType<T> prim = Writables.getPrimitiveType(ptype.getTypeClass());
+    if (prim != null) {
+      return prim;
+    }
+    return PTypeUtils.convert(ptype, this);
+  }
+
+  @Override
+  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
+    return Writables.tuples(clazz, ptypes);
+  }
+
+  @Override
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
+      MapFn<T, S> outputFn, PType<S> base) {
+    return Writables.derived(clazz, inputFn, outputFn, base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java b/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
new file mode 100644
index 0000000..85cadc8
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import org.apache.hadoop.io.NullWritable;
+
+import org.apache.crunch.types.Converter;
+
+class WritableValueConverter<W> implements Converter<Object, W, W, Iterable<W>> {
+  
+  private final Class<W> serializationClass;
+  
+  public WritableValueConverter(Class<W> serializationClass) {
+    this.serializationClass = serializationClass;
+  }
+  
+  @Override
+  public W convertInput(Object key, W value) {
+    return value;
+  }
+
+  @Override
+  public Object outputKey(W value) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public W outputValue(W value) {
+    return value;
+  }
+
+  @Override
+  public Class<Object> getKeyClass() {
+    return (Class<Object>) (Class<?>) NullWritable.class;
+  }
+
+  @Override
+  public Class<W> getValueClass() {
+    return serializationClass;
+  }
+
+  @Override
+  public Iterable<W> convertIterableInput(Object key, Iterable<W> value) {
+    return value;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/Writables.java b/src/main/java/org/apache/crunch/types/writable/Writables.java
new file mode 100644
index 0000000..08b6c64
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.util.PTypes;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Defines static methods that are analogous to the methods defined in
+ * {@link WritableTypeFamily} for convenient static importing.
+ * 
+ */
+public class Writables {
+  private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>() {
+    @Override
+    public Void map(NullWritable input) {
+      return null;
+    }
+  };
+
+  private static final MapFn<Void, NullWritable> VOID_TO_NULL_WRITABLE = new MapFn<Void, NullWritable>() {
+    @Override
+    public NullWritable map(Void input) {
+      return NullWritable.get();
+    }
+  };
+  
+  private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text, String>() {
+    @Override
+    public String map(Text input) {
+      return input.toString();
+    }
+  };
+
+  private static final MapFn<String, Text> STRING_TO_TEXT = new MapFn<String, Text>() {
+    @Override
+    public Text map(String input) {
+      return new Text(input);
+    }
+  };
+
+  private static final MapFn<IntWritable, Integer> IW_TO_INT = new MapFn<IntWritable, Integer>() {
+    @Override
+    public Integer map(IntWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Integer, IntWritable> INT_TO_IW = new MapFn<Integer, IntWritable>() {
+    @Override
+    public IntWritable map(Integer input) {
+      return new IntWritable(input);
+    }
+  };
+
+  private static final MapFn<LongWritable, Long> LW_TO_LONG = new MapFn<LongWritable, Long>() {
+    @Override
+    public Long map(LongWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Long, LongWritable> LONG_TO_LW = new MapFn<Long, LongWritable>() {
+    @Override
+    public LongWritable map(Long input) {
+      return new LongWritable(input);
+    }
+  };
+
+  private static final MapFn<FloatWritable, Float> FW_TO_FLOAT = new MapFn<FloatWritable, Float>() {
+    @Override
+    public Float map(FloatWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Float, FloatWritable> FLOAT_TO_FW = new MapFn<Float, FloatWritable>() {
+    @Override
+    public FloatWritable map(Float input) {
+      return new FloatWritable(input);
+    }
+  };
+
+  private static final MapFn<DoubleWritable, Double> DW_TO_DOUBLE = new MapFn<DoubleWritable, Double>() {
+    @Override
+    public Double map(DoubleWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Double, DoubleWritable> DOUBLE_TO_DW = new MapFn<Double, DoubleWritable>() {
+    @Override
+    public DoubleWritable map(Double input) {
+      return new DoubleWritable(input);
+    }
+  };
+
+  private static final MapFn<BooleanWritable, Boolean> BW_TO_BOOLEAN = new MapFn<BooleanWritable, Boolean>() {
+    @Override
+    public Boolean map(BooleanWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final BooleanWritable TRUE = new BooleanWritable(true);
+  private static final BooleanWritable FALSE = new BooleanWritable(false);
+  private static final MapFn<Boolean, BooleanWritable> BOOLEAN_TO_BW = new MapFn<Boolean, BooleanWritable>() {
+    @Override
+    public BooleanWritable map(Boolean input) {
+      return input == Boolean.TRUE ? TRUE : FALSE;
+    }
+  };
+  
+  private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new MapFn<BytesWritable, ByteBuffer>() {
+    @Override
+    public ByteBuffer map(BytesWritable input) {
+      return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
+    }
+  };
+  
+  private static final MapFn<ByteBuffer, BytesWritable> BB_TO_BW = new MapFn<ByteBuffer, BytesWritable>() {
+    @Override
+    public BytesWritable map(ByteBuffer input) {
+      BytesWritable bw = new BytesWritable();
+      bw.set(input.array(), input.arrayOffset(), input.limit());
+      return bw;
+    }
+  };
+
+  private static <S, W extends Writable> WritableType<S, W> create(Class<S> typeClass,
+      Class<W> writableClass, MapFn<W, S> inputDoFn, MapFn<S, W> outputDoFn) {
+    return new WritableType<S, W>(typeClass, writableClass, inputDoFn,
+        outputDoFn);
+  }
+
+  private static final WritableType<Void, NullWritable> nulls = create(Void.class, NullWritable.class,
+      NULL_WRITABLE_TO_VOID, VOID_TO_NULL_WRITABLE);
+  private static final WritableType<String, Text> strings = create(String.class, Text.class,
+      TEXT_TO_STRING, STRING_TO_TEXT);
+  private static final WritableType<Long, LongWritable> longs = create(Long.class, LongWritable.class,
+      LW_TO_LONG, LONG_TO_LW);
+  private static final WritableType<Integer, IntWritable> ints = create(Integer.class, IntWritable.class,
+      IW_TO_INT, INT_TO_IW);
+  private static final WritableType<Float, FloatWritable> floats = create(Float.class, FloatWritable.class,
+      FW_TO_FLOAT, FLOAT_TO_FW);
+  private static final WritableType<Double, DoubleWritable> doubles = create(Double.class,
+      DoubleWritable.class, DW_TO_DOUBLE, DOUBLE_TO_DW);
+  private static final WritableType<Boolean, BooleanWritable> booleans = create(Boolean.class,
+      BooleanWritable.class, BW_TO_BOOLEAN, BOOLEAN_TO_BW);
+  private static final WritableType<ByteBuffer, BytesWritable> bytes = create(ByteBuffer.class,
+      BytesWritable.class, BW_TO_BB, BB_TO_BW);
+
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>>builder()
+      .put(String.class, strings)
+      .put(Long.class, longs)
+      .put(Integer.class, ints)
+      .put(Float.class, floats)
+      .put(Double.class, doubles)
+      .put(Boolean.class, booleans)
+      .put(ByteBuffer.class, bytes)
+      .build();
+  
+  private static final Map<Class<?>, WritableType<?, ?>> EXTENSIONS = Maps.newHashMap();
+  
+  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+    return (PType<T>) PRIMITIVES.get(clazz);
+  }
+  
+  public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
+    EXTENSIONS.put(clazz, ptype);
+  }
+  
+  public static final WritableType<Void, NullWritable> nulls() {
+    return nulls;
+  }
+
+  public static final WritableType<String, Text> strings() {
+    return strings;
+  }
+
+  public static final WritableType<Long, LongWritable> longs() {
+    return longs;
+  }
+
+  public static final WritableType<Integer, IntWritable> ints() {
+    return ints;
+  }
+
+  public static final WritableType<Float, FloatWritable> floats() {
+    return floats;
+  }
+
+  public static final WritableType<Double, DoubleWritable> doubles() {
+    return doubles;
+  }
+
+  public static final WritableType<Boolean, BooleanWritable> booleans() {
+    return booleans;
+  }
+  
+  public static final WritableType<ByteBuffer, BytesWritable> bytes() {
+    return bytes;
+  }
+  
+  public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
+    if (EXTENSIONS.containsKey(clazz)) {
+      return (WritableType<T, W>) EXTENSIONS.get(clazz);
+    }
+    return (WritableType<T, W>) writables(clazz.asSubclass(Writable.class));
+  }
+
+  public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) {
+    MapFn wIdentity = IdentityFn.getInstance();
+    return new WritableType<W, W>(clazz, clazz, wIdentity, wIdentity);
+  }
+
+  public static <K, V> WritableTableType<K, V> tableOf(
+      PType<K> key, PType<V> value) {
+    if (key instanceof WritableTableType) {
+      WritableTableType wtt = (WritableTableType) key;
+      key = pairs(wtt.getKeyType(), wtt.getValueType());
+    } else if (!(key instanceof WritableType)) {
+      throw new IllegalArgumentException("Key type must be of class WritableType");
+    }
+    if (value instanceof WritableTableType) {
+      WritableTableType wtt = (WritableTableType) value;
+      value = pairs(wtt.getKeyType(), wtt.getValueType());
+    } else if (!(value instanceof WritableType)) {
+      throw new IllegalArgumentException("Value type must be of class WritableType");
+    }
+    return new WritableTableType((WritableType) key, (WritableType) value);
+  }
+
+  /**
+   * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
+   * 
+   */
+  private static class TWTupleMapFn extends MapFn<TupleWritable, Tuple> {
+    private final TupleFactory<?> tupleFactory;
+    private final List<MapFn> fns;
+
+    private transient Object[] values;
+
+    public TWTupleMapFn(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
+      this.tupleFactory = tupleFactory;
+      this.fns = Lists.newArrayList();
+      for (PType ptype : ptypes) {
+        fns.add(ptype.getInputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.setConfigurationForTest(conf);
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+      // The rest of the methods allocate new
+      // objects each time. However this one
+      // uses Tuple.tuplify which does a copy
+      this.values = new Object[fns.size()];
+      tupleFactory.initialize();
+    }
+
+    @Override
+    public Tuple map(TupleWritable in) {
+      for (int i = 0; i < values.length; i++) {
+        if (in.has(i)) {
+          values[i] = fns.get(i).map(in.get(i));
+        } else {
+          values[i] = null;
+        }
+      }
+      return tupleFactory.makeTuple(values);
+    }
+  }
+
+  /**
+   * For mapping from {@code Tuple}s to {@code TupleWritable}s.
+   * 
+   */
+  private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
+    
+    private transient TupleWritable writable;
+    private transient Writable[] values;
+
+    private final List<MapFn> fns;
+
+    public TupleTWMapFn(PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      for (PType<?> ptype : ptypes) {
+        fns.add(ptype.getOutputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.setConfigurationForTest(conf);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      this.values = new Writable[fns.size()];
+      this.writable = new TupleWritable(values);
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+    }
+
+    @Override
+    public TupleWritable map(Tuple input) {
+      writable.clearWritten();
+      for (int i = 0; i < input.size(); i++) {
+        Object value = input.get(i);
+        if (value != null) {
+          writable.setWritten(i);
+          values[i] = (Writable) fns.get(i).map(value);
+        }
+      }
+      return writable;
+    }
+  }
+
+  public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2);
+    TupleTWMapFn output = new TupleTWMapFn(p1, p2);
+    return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2);
+  }
+
+  public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3);
+    TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
+    return new WritableType(Tuple3.class, TupleWritable.class,
+        input,
+        output,
+        p1, p2, p3);
+  }
+
+  public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4);
+    TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
+    return new WritableType(Tuple4.class, TupleWritable.class,
+        input,
+        output,
+        p1, p2, p3, p4);
+  }
+
+  public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes);
+    TupleTWMapFn output = new TupleTWMapFn(ptypes);
+    return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes);
+  }
+
+  public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType... ptypes) {
+    Class[] typeArgs = new Class[ptypes.length];
+    for (int i = 0; i < typeArgs.length; i++) {
+      typeArgs[i] = ptypes[i].getTypeClass();
+    }
+    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+    TWTupleMapFn input = new TWTupleMapFn(factory, ptypes);
+    TupleTWMapFn output = new TupleTWMapFn(ptypes);
+    return new WritableType(clazz, TupleWritable.class, input, output, ptypes);  
+  }
+  
+  public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+      PType<S> base) {
+    WritableType<S, ?> wt = (WritableType<S, ?>) base;
+    MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
+    MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
+    return new WritableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0]));
+  }
+  
+  private static class ArrayCollectionMapFn<T> extends
+      MapFn<GenericArrayWritable, Collection<T>> {
+    private final MapFn<Object, T> mapFn;
+
+    public ArrayCollectionMapFn(MapFn<Object, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.setContext(getContext());   
+    }
+    
+    @Override
+    public Collection<T> map(GenericArrayWritable input) {
+      Collection<T> collection = Lists.newArrayList();
+      for (Writable writable : input.get()) {
+        collection.add(mapFn.map(writable));
+      }
+      return collection;
+    }
+  }
+
+  private static class CollectionArrayMapFn<T> extends
+      MapFn<Collection<T>, GenericArrayWritable> {
+    
+    private final Class<? extends Writable> clazz;
+    private final MapFn<T, Object> mapFn;
+
+    public CollectionArrayMapFn(Class<? extends Writable> clazz,
+        MapFn<T, Object> mapFn) {
+      this.clazz = clazz;
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.setContext(getContext());   
+    }
+    
+    @Override
+    public GenericArrayWritable map(Collection<T> input) {
+      GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz);
+      Writable[] w = new Writable[input.size()];
+      int index = 0;
+      for (T in : input) {
+        w[index++] = ((Writable) mapFn.map(in));
+      }
+      arrayWritable.set(w);
+      return arrayWritable;
+    }
+  }
+
+  public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) {
+    WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
+    return new WritableType(Collection.class, GenericArrayWritable.class,
+        new ArrayCollectionMapFn(wt.getInputMapFn()), new CollectionArrayMapFn(
+            wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+  }
+
+  private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> {
+    private final MapFn<Writable, T> mapFn;
+
+    public MapInputMapFn(MapFn<Writable, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      mapFn.setContext(getContext());
+    }
+
+    @Override
+    public Map<String, T> map(TextMapWritable<Writable> input) {
+      Map<String, T> out = Maps.newHashMap();
+      for (Map.Entry<Text, Writable> e : input.entrySet()) {
+        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+      }
+      return out;
+    }
+  }
+  
+  private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> {
+
+    private final Class<Writable> clazz;
+    private final MapFn<T, Writable> mapFn;
+
+    public MapOutputMapFn(Class<Writable> clazz, MapFn<T, Writable> mapFn) {
+      this.clazz = clazz;
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setConfigurationForTest(Configuration conf) {
+      mapFn.setConfigurationForTest(conf);
+    }
+
+    @Override
+    public void initialize() {
+      mapFn.setContext(getContext());
+    }
+
+    @Override
+    public TextMapWritable<Writable> map(Map<String, T> input) {
+      TextMapWritable<Writable> tmw = new TextMapWritable<Writable>(clazz);
+      for (Map.Entry<String, T> e : input.entrySet()) {
+        tmw.put(new Text(e.getKey()), mapFn.map(e.getValue()));
+      }
+      return tmw;
+    }	
+  }
+  
+  public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
+	WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
+    return new WritableType(Map.class, TextMapWritable.class,
+        new MapInputMapFn(wt.getInputMapFn()),
+        new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+  }
+  
+  public static <T> PType<T> jsons(Class<T> clazz) {
+    return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());  
+  }
+  
+  /**
+   * Perform a deep copy of a writable value.
+   * 
+   * @param value
+   *          The value to be copied
+   * @param writableClass
+   *          The Writable class of the value to be copied
+   * @return A fully detached deep copy of the input value
+   */
+  public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) {
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+    T copiedValue = null;
+    try {
+      value.write(dataOut);
+      dataOut.flush();
+      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+      DataInput dataInput = new DataInputStream(byteInStream);
+      copiedValue = writableClass.newInstance();
+      copiedValue.readFields(dataInput);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying " + value, e);
+    }
+    return copiedValue;
+  }
+
+  // Not instantiable
+  private Writables() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/util/Collects.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/Collects.java b/src/main/java/org/apache/crunch/util/Collects.java
new file mode 100644
index 0000000..f5b07c4
--- /dev/null
+++ b/src/main/java/org/apache/crunch/util/Collects.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Utility functions for returning Collection objects backed by different types
+ * of implementations.
+ */
+public class Collects {
+  
+  public static <T> Collection<T> newArrayList() {
+    return Lists.newArrayList();
+  }
+
+  public static <T> Collection<T> newArrayList(T...elements) {
+    return Lists.newArrayList(elements);
+  }
+
+  public static <T> Collection<T> newArrayList(Iterable<? extends T> elements) {
+    return Lists.newArrayList(elements);
+  }
+
+  public static <T> Collection<T> newArrayList(Iterator<? extends T> elements) {
+    return Lists.newArrayList(elements);
+  }
+
+  private Collects() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/DistCache.java b/src/main/java/org/apache/crunch/util/DistCache.java
new file mode 100644
index 0000000..06ad22a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/util/DistCache.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+
+/**
+ * Provides functions for working with Hadoop's distributed cache. These include:
+ * <ul>
+ *   <li>
+ *     Functions for working with a job-specific distributed cache of objects, like the
+ *     serialized runtime nodes in a MapReduce.
+ *   </li>
+ *   <li>
+ *     Functions for adding library jars to the distributed cache, which will be added to the
+ *     classpath of MapReduce tasks.
+ *   </li>
+ * </ul>
+ */
+public class DistCache {
+
+  // Configuration key holding the paths of jars to export to the distributed cache.
+  private static final String TMPJARS_KEY = "tmpjars";
+
+  public static void write(Configuration conf, Path path, Object value) throws IOException {
+    ObjectOutputStream oos = new ObjectOutputStream(FileSystem.get(conf).create(path));
+    oos.writeObject(value);
+    oos.close();
+
+    DistributedCache.addCacheFile(path.toUri(), conf);
+  }
+
+  public static Object read(Configuration conf, Path path) throws IOException {
+    URI target = null;
+    for (URI uri : DistributedCache.getCacheFiles(conf)) {
+      if (uri.toString().equals(path.toString())) {
+        target = uri;
+        break;
+      }
+    }
+    Object value = null;
+    if (target != null) {
+      Path targetPath = new Path(target.toString());
+      ObjectInputStream ois = new ObjectInputStream(targetPath.getFileSystem(conf).open(targetPath));
+      try {
+        value = ois.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      ois.close();
+    }
+    return value;
+  }
+
+  /**
+   * Adds the specified jar to the distributed cache of jobs using the provided configuration. The
+   * jar will be placed on the classpath of tasks run by the job.
+   *
+   * @param conf The configuration used to add the jar to the distributed cache.
+   * @param jarFile The jar file to add to the distributed cache.
+   * @throws IOException If the jar file does not exist or there is a problem accessing the file.
+   */
+  public static void addJarToDistributedCache(Configuration conf, File jarFile) throws IOException {
+    if (!jarFile.exists()) {
+      throw new IOException("Jar file: " + jarFile.getCanonicalPath() + " does not exist.");
+    }
+    if (!jarFile.getName().endsWith(".jar")) {
+      throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar "
+          + "file.");
+    }
+    // Get a qualified path for the jar.
+    FileSystem fileSystem = FileSystem.getLocal(conf);
+    Path jarPath = new Path(jarFile.getCanonicalPath());
+    String qualifiedPath = jarPath.makeQualified(fileSystem).toString();
+    // Add the jar to the configuration variable.
+    String jarConfiguration = conf.get(TMPJARS_KEY, "");
+    if (!jarConfiguration.isEmpty()) {
+      jarConfiguration += ",";
+    }
+    jarConfiguration += qualifiedPath;
+    conf.set(TMPJARS_KEY, jarConfiguration);
+  }
+
+  /**
+   * Adds the jar at the specified path to the distributed cache of jobs using the provided
+   * configuration. The jar will be placed on the classpath of tasks run by the job.
+   *
+   * @param conf The configuration used to add the jar to the distributed cache.
+   * @param jarFile The path to the jar file to add to the distributed cache.
+   * @throws IOException If the jar file does not exist or there is a problem accessing the file.
+   */
+  public static void addJarToDistributedCache(Configuration conf, String jarFile)
+      throws IOException {
+    addJarToDistributedCache(conf, new File(jarFile));
+  }
+
+  /**
+   * Adds all jars under the specified directory to the distributed cache of jobs using the
+   * provided configuration. The jars will be placed on the classpath of tasks run by the job.
+   * This method does not descend into subdirectories when adding jars.
+   *
+   * @param conf The configuration used to add jars to the distributed cache.
+   * @param jarDirectory A directory containing jar files to add to the distributed cache.
+   * @throws IOException If the directory does not exist or there is a problem accessing the
+   *     directory.
+   */
+  public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory)
+      throws IOException {
+    if (!jarDirectory.exists() || !jarDirectory.isDirectory()) {
+      throw new IOException("Jar directory: " + jarDirectory.getCanonicalPath() + " does not "
+        + "exist or is not a directory.");
+    }
+    for (File file : jarDirectory.listFiles()) {
+      if (!file.isDirectory() && file.getName().endsWith(".jar")) {
+        addJarToDistributedCache(conf, file);
+      }
+    }
+  }
+
+  /**
+   * Adds all jars under the directory at the specified path to the distributed cache of jobs
+   * using the provided configuration.  The jars will be placed on the classpath of the tasks
+   * run by the job. This method does not descend into subdirectories when adding jars.
+   *
+   * @param conf The configuration used to add jars to the distributed cache.
+   * @param jarDirectory The path to a directory containing jar files to add to the distributed
+   *     cache.
+   * @throws IOException If the directory does not exist or there is a problem accessing the
+   *     directory.
+   */
+  public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory)
+      throws IOException {
+    addJarDirToDistributedCache(conf, new File(jarDirectory));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/util/PTypes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/PTypes.java b/src/main/java/org/apache/crunch/util/PTypes.java
new file mode 100644
index 0000000..863b40f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/util/PTypes.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.smile.SmileFactory;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+/**
+ * Utility functions for creating common types of derived PTypes, e.g., for JSON data,
+ * protocol buffers, and Thrift records.
+ *
+ */
+public class PTypes {
+
+  public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
+    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());  
+  }
+  
+  public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
+    return typeFamily.derived(clazz, new JacksonInputMapFn<T>(clazz),
+        new JacksonOutputMapFn<T>(), typeFamily.strings());
+  }
+
+  public static <T> PType<T> smile(Class<T> clazz, PTypeFamily typeFamily) {
+	return typeFamily.derived(clazz, new SmileInputMapFn<T>(clazz),
+	    new SmileOutputMapFn<T>(), typeFamily.bytes());
+  }
+  
+  public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
+    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz),
+        new ProtoOutputMapFn<T>(), typeFamily.bytes());
+  }
+  
+  public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
+    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz),
+        new ThriftOutputMapFn<T>(), typeFamily.bytes());
+  }
+  
+  public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
+    public BigInteger map(ByteBuffer input) {
+      return input == null ? null : new BigInteger(input.array());
+    }
+  };
+
+  public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
+    public ByteBuffer map(BigInteger input) {
+      return input == null ? null : ByteBuffer.wrap(input.toByteArray());
+    }
+  };
+  
+  public static class SmileInputMapFn<T> extends MapFn<ByteBuffer, T> {
+
+    private final Class<T> clazz;
+    private transient ObjectMapper mapper;
+    
+    public SmileInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper(new SmileFactory());
+    }
+    
+	@Override
+	public T map(ByteBuffer input) {
+      try {
+        return mapper.readValue(input.array(), input.position(), input.limit(), clazz);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+	}
+  }
+  
+  public static class SmileOutputMapFn<T> extends MapFn<T, ByteBuffer> {
+    private transient ObjectMapper mapper;
+    
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper(new SmileFactory());
+    }
+    
+    @Override
+    public ByteBuffer map(T input) {
+      try {
+        return ByteBuffer.wrap(mapper.writeValueAsBytes(input));
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public static class JacksonInputMapFn<T> extends MapFn<String, T> {
+    
+    private final Class<T> clazz;
+    private transient ObjectMapper mapper;
+    
+    public JacksonInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+    
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper();
+    }
+    
+    @Override
+    public T map(String input) {
+      try {
+        return mapper.readValue(input, clazz);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    } 
+  }
+  
+  public static class JacksonOutputMapFn<T> extends MapFn<T, String> {
+    
+    private transient ObjectMapper mapper;
+    
+    @Override
+    public void initialize() {
+      this.mapper = new ObjectMapper();
+    }
+    
+    @Override
+    public String map(T input) {
+      try {
+        return mapper.writeValueAsString(input);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+  
+  public static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
+    
+    private final Class<T> clazz;
+    private transient T instance;
+    
+    public ProtoInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+    
+    @Override
+    public void initialize() {
+      this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
+    }
+    
+    @Override
+    public T map(ByteBuffer bb) {
+      try {
+        return (T) instance.newBuilderForType().mergeFrom(
+            bb.array(), bb.position(), bb.limit()).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }    
+  }
+  
+  public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
+    
+    public ProtoOutputMapFn() {
+    }
+    
+    @Override
+    public ByteBuffer map(T proto) {
+      return ByteBuffer.wrap(proto.toByteArray());
+    }    
+  }
+
+  public static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
+
+    private final Class<T> clazz;
+    private transient T instance;
+    private transient TDeserializer deserializer;
+    private transient byte[] bytes;
+    
+    public ThriftInputMapFn(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+    
+    @Override
+    public void initialize() {
+      this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
+      this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+      this.bytes = new byte[0];
+    }
+    
+    @Override
+    public T map(ByteBuffer bb) {
+      T next = (T) instance.deepCopy();
+      int len = bb.limit() - bb.position();
+      if (len != bytes.length) {
+        bytes = new byte[len];
+      }
+      System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
+      try {
+        deserializer.deserialize(next, bytes);
+      } catch (TException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      return next;
+    }    
+  }
+  
+  public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
+
+    private transient TSerializer serializer;
+    
+    public ThriftOutputMapFn() {
+    }
+    
+    @Override
+    public void initialize() {
+      this.serializer = new TSerializer(new TBinaryProtocol.Factory());
+    }
+    
+    @Override
+    public ByteBuffer map(T t) {
+      try {
+        return ByteBuffer.wrap(serializer.serialize(t));
+      } catch (TException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/util/Protos.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/Protos.java b/src/main/java/org/apache/crunch/util/Protos.java
new file mode 100644
index 0000000..2cda492
--- /dev/null
+++ b/src/main/java/org/apache/crunch/util/Protos.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import com.google.common.base.Splitter;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+
+/**
+ * Utility functions for working with protocol buffers in Crunch.
+ */
+public class Protos {
+
+  public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) {
+    return new ExtractKeyFn<M, K>(fieldName);
+  }
+  
+  public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) {
+    return new TextToProtoFn<M>(sep, msgClass);  
+  }
+  
+  public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
+    
+    private final String fieldName;
+    
+    private transient FieldDescriptor fd;
+    
+    public ExtractKeyFn(String fieldName) {
+      this.fieldName = fieldName;
+    }
+    
+    @Override
+    public K map(M input) {
+      if (input == null) {
+        throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn");
+      } else if (fd == null) {
+        fd = input.getDescriptorForType().findFieldByName(fieldName);
+        if (fd == null) {
+          throw new IllegalStateException(
+              "Could not find field: " + fieldName + " in message: " + input);
+        }
+      }
+      return (K) input.getField(fd);
+    }
+    
+  }
+  
+  public static class TextToProtoFn<M extends Message> extends DoFn<String, M> {
+    
+    private final String sep;
+    private final Class<M> msgClass;
+    
+    private transient M msgInstance;
+    private transient List<FieldDescriptor> fields;
+    private transient Splitter splitter;
+    
+    enum ParseErrors { TOTAL, NUMBER_FORMAT };
+    
+    public TextToProtoFn(String sep, Class<M> msgClass) {
+      this.sep = sep;
+      this.msgClass = msgClass;
+    }
+    
+    @Override
+    public void initialize() {
+      this.msgInstance = ReflectionUtils.newInstance(msgClass, getConfiguration());
+      this.fields = msgInstance.getDescriptorForType().getFields();
+      this.splitter = Splitter.on(sep);
+    }
+
+    @Override
+    public void process(String input, Emitter<M> emitter) {
+      if (input != null && !input.isEmpty()) {
+        Builder b = msgInstance.newBuilderForType();
+        Iterator<String> iter = splitter.split(input).iterator();
+        boolean parseError = false;
+        for (FieldDescriptor fd : fields) {
+          if (iter.hasNext()) {
+            String value = iter.next();
+            if (value != null && !value.isEmpty()) {
+              Object parsedValue = null;
+              try {
+                switch (fd.getJavaType()) {
+                case STRING:
+                  parsedValue = value;
+                  break;
+                case INT:
+                  parsedValue = Integer.valueOf(value);
+                  break;
+                case LONG:
+                  parsedValue = Long.valueOf(value);
+                  break;
+                case FLOAT:
+                  parsedValue = Float.valueOf(value);
+                  break;
+                case DOUBLE:
+                  parsedValue = Double.valueOf(value);
+                  break;
+                case BOOLEAN:
+                  parsedValue = Boolean.valueOf(value);
+                  break;
+                case ENUM:
+                  parsedValue = fd.getEnumType().findValueByName(value);
+                  break;
+                }
+                b.setField(fd, parsedValue);
+              } catch (NumberFormatException nfe) {
+                increment(ParseErrors.NUMBER_FORMAT);
+                parseError = true;
+                break;
+              }
+            }
+          }
+        }
+        
+        if (parseError) {
+          increment(ParseErrors.TOTAL);
+        } else {
+          emitter.emit((M) b.build());
+        }
+      }
+    }
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/util/Tuples.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/Tuples.java b/src/main/java/org/apache/crunch/util/Tuples.java
new file mode 100644
index 0000000..b8eb3b9
--- /dev/null
+++ b/src/main/java/org/apache/crunch/util/Tuples.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import com.google.common.collect.Lists;
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * Utilities for working with subclasses of the {@code Tuple} interface.
+ *
+ */
+public class Tuples {
+
+  private static abstract class TuplifyIterator<T> extends UnmodifiableIterator<T> {
+    protected List<Iterator<?>> iterators;
+    
+    public TuplifyIterator(Iterator<?>...iterators) {
+      this.iterators = Lists.newArrayList(iterators);
+    }
+    
+    @Override
+    public boolean hasNext() {
+      for (Iterator<?> iter : iterators) {
+        if (!iter.hasNext()) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    protected Object next(int index) {
+      return iterators.get(index).next();
+    }
+  }
+  
+  public static class PairIterable<S, T> implements Iterable<Pair<S, T>> {
+    private final Iterable<S> first;
+    private final Iterable<T> second;
+    
+    public PairIterable(Iterable<S> first, Iterable<T> second) {
+      this.first = first;
+      this.second = second;
+    }
+    
+    @Override
+    public Iterator<Pair<S, T>> iterator() {
+      return new TuplifyIterator<Pair<S, T>>(first.iterator(), second.iterator()) {
+        @Override
+        public Pair<S, T> next() {
+          return Pair.of((S) next(0), (T) next(1));
+        }
+      };
+    }   
+  }
+  
+  public static class TripIterable<A, B, C> implements Iterable<Tuple3<A, B, C>> {
+    private final Iterable<A> first;
+    private final Iterable<B> second;
+    private final Iterable<C> third;
+    
+    public TripIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third) {
+      this.first = first;
+      this.second = second;
+      this.third = third;
+    }
+    
+    @Override
+    public Iterator<Tuple3<A, B, C>> iterator() {
+      return new TuplifyIterator<Tuple3<A, B, C>>(first.iterator(), second.iterator(),
+          third.iterator()) {
+        @Override
+        public Tuple3<A, B, C> next() {
+          return new Tuple3<A, B, C>((A) next(0), (B) next(1), (C) next(2));
+        }
+      };
+    }   
+  }
+  
+  public static class QuadIterable<A, B, C, D> implements Iterable<Tuple4<A, B, C, D>> {
+    private final Iterable<A> first;
+    private final Iterable<B> second;
+    private final Iterable<C> third;
+    private final Iterable<D> fourth;
+    
+    public QuadIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third,
+        Iterable<D> fourth) {
+      this.first = first;
+      this.second = second;
+      this.third = third;
+      this.fourth = fourth;
+    }
+    
+    @Override
+    public Iterator<Tuple4<A, B, C, D>> iterator() {
+      return new TuplifyIterator<Tuple4<A, B, C, D>>(first.iterator(), second.iterator(),
+          third.iterator(), fourth.iterator()) {
+        @Override
+        public Tuple4<A, B, C, D> next() {
+          return new Tuple4<A, B, C, D>((A) next(0), (B) next(1), (C) next(2), (D) next(3));
+        }
+      };
+    }   
+  }
+  
+  public static class TupleNIterable implements Iterable<TupleN> {
+    private final Iterator<?>[] iters;
+    
+    public TupleNIterable(Iterable<?>... iterables) {
+      this.iters = new Iterator[iterables.length];
+      for (int i = 0; i < iters.length; i++) {
+        iters[i] = iterables[i].iterator();
+      }
+    }
+    
+    @Override
+    public Iterator<TupleN> iterator() {
+      return new TuplifyIterator<TupleN>(iters) {
+        @Override
+        public TupleN next() {
+          Object[] values = new Object[iters.length];
+          for (int i = 0; i < values.length; i++) {
+            values[i] = next(i);
+          }
+          return new TupleN(values);
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index cd88329..6756dbb 100644
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.mapreduce.lib.jobcontrol;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 5738c02..7fa61d3 100644
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.mapreduce.lib.jobcontrol;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
index ae2b253..10d033f 100644
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
+++ b/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.cloudera.crunch.impl.mr.run.TaskAttemptContextFactory;
+import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
 
 /**
  * The MultipleOutputs class simplifies writing output data 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
index 1247993..dc08a07 100644
--- a/src/main/resources/log4j.properties
+++ b/src/main/resources/log4j.properties
@@ -1,5 +1,5 @@
 # ***** Set root logger level to INFO and its only appender to A.
-log4j.logger.com.cloudera.crunch=info, A
+log4j.logger.org.apache.crunch=info, A
 
 # ***** A is set to be a ConsoleAppender.
 log4j.appender.A=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/CollectionsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/CollectionsTest.java b/src/test/java/com/cloudera/crunch/CollectionsTest.java
deleted file mode 100644
index 1ba99b9..0000000
--- a/src/test/java/com/cloudera/crunch/CollectionsTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("serial")
-public class CollectionsTest {
-  
-  public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> {
-    private final Collection<String> rtn = Lists.newArrayList();
-    
-    @Override
-    public void reset() {
-      rtn.clear();
-    }
-    
-    @Override
-    public void update(Collection<String> values) {
-      rtn.addAll(values);
-    }      
-    
-    @Override
-    public Iterable<Collection<String>> results() {
-      return ImmutableList.of(rtn);
-    }
-  }
-  
-  public static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
-     
-    return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
-      @Override
-      public void process(String line, Emitter<Pair<String, Collection<String>>> emitter) {
-        for (String word : line.split("\\s+")) {
-          Collection<String> characters = Lists.newArrayList();
-          for(char c : word.toCharArray()) {
-            characters.add(String.valueOf(c));
-          }
-          emitter.emit(Pair.of(word, characters));
-        }
-      }
-    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
-    .groupByKey()
-    .combineValues(CombineFn.<String, Collection<String>>aggregator(new AggregateStringListFn()));
-  }
-  
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(CollectionsTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(CollectionsTest.class), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryWritables() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryAvro() throws IOException {
-    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Collection<String>>> lines = listOfCharcters(shakespeare, typeFamily).materialize();
-    
-    boolean passed = false;
-    for (Pair<String, Collection<String>> line : lines) {
-      if(line.first().startsWith("yellow")) {
-        passed = true;
-        break;
-      }
-    }
-    pipeline.done();
-    assertTrue(passed);
-  }  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/CombineFnTest.java b/src/test/java/com/cloudera/crunch/CombineFnTest.java
deleted file mode 100644
index 66d0635..0000000
--- a/src/test/java/com/cloudera/crunch/CombineFnTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static com.cloudera.crunch.CombineFn.MAX_BIGINTS;
-import static com.cloudera.crunch.CombineFn.MAX_DOUBLES;
-import static com.cloudera.crunch.CombineFn.MAX_FLOATS;
-import static com.cloudera.crunch.CombineFn.MAX_INTS;
-import static com.cloudera.crunch.CombineFn.MAX_LONGS;
-import static com.cloudera.crunch.CombineFn.MIN_BIGINTS;
-import static com.cloudera.crunch.CombineFn.MIN_DOUBLES;
-import static com.cloudera.crunch.CombineFn.MIN_FLOATS;
-import static com.cloudera.crunch.CombineFn.MIN_INTS;
-import static com.cloudera.crunch.CombineFn.MIN_LONGS;
-import static com.cloudera.crunch.CombineFn.SUM_BIGINTS;
-import static com.cloudera.crunch.CombineFn.SUM_DOUBLES;
-import static com.cloudera.crunch.CombineFn.SUM_FLOATS;
-import static com.cloudera.crunch.CombineFn.SUM_INTS;
-import static com.cloudera.crunch.CombineFn.SUM_LONGS;
-import static org.junit.Assert.assertEquals;
-
-import java.math.BigInteger;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.CombineFn.Aggregator;
-import com.cloudera.crunch.CombineFn.AggregatorFactory;
-import com.cloudera.crunch.CombineFn.FirstNAggregator;
-import com.cloudera.crunch.CombineFn.LastNAggregator;
-import com.cloudera.crunch.CombineFn.MaxNAggregator;
-import com.cloudera.crunch.CombineFn.MinNAggregator;
-import com.cloudera.crunch.CombineFn.PairAggregator;
-import com.cloudera.crunch.CombineFn.QuadAggregator;
-import com.cloudera.crunch.CombineFn.TripAggregator;
-import com.cloudera.crunch.CombineFn.TupleNAggregator;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-public class CombineFnTest {
-
-  private <T> Iterable<T> applyAggregator(AggregatorFactory<T> a, Iterable<T> values) {
-    return applyAggregator(a.create(), values);
-  }
-  
-  private <T> Iterable<T> applyAggregator(Aggregator<T> a, Iterable<T> values) {
-    a.reset();
-    for (T value : values) {
-      a.update(value);
-    }
-    return a.results();
-  }
-  
-  @Test
-  public void testSums() {
-    assertEquals(ImmutableList.of(1775L),
-        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-
-    assertEquals(ImmutableList.of(1765L),
-        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
-
-    assertEquals(ImmutableList.of(1775),
-        applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1775.0f),
-        applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1775.0),
-        applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(new BigInteger("1775")),
-        applyAggregator(SUM_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-  
-  @Test
-  public void testMax() {
-    assertEquals(ImmutableList.of(1729L),
-        applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-    
-    assertEquals(ImmutableList.of(1729),
-        applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1729.0f),
-        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1729.0),
-        applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(1745.0f),
-        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(new BigInteger("1729")),
-        applyAggregator(MAX_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-  
-  @Test
-  public void testMin() {
-    assertEquals(ImmutableList.of(17L),
-        applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-    
-    assertEquals(ImmutableList.of(17),
-        applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(17.0f),
-        applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(17.0),
-        applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(29),
-        applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
-    
-    assertEquals(ImmutableList.of(new BigInteger("17")),
-        applyAggregator(MIN_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-
-  @Test
-  public void testMaxN() {
-    assertEquals(ImmutableList.of(98, 1009), applyAggregator(new MaxNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testMinN() {
-    assertEquals(ImmutableList.of(17, 29), applyAggregator(new MinNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testFirstN() {
-    assertEquals(ImmutableList.of(17, 34), applyAggregator(new FirstNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testLastN() {
-    assertEquals(ImmutableList.of(29, 1009), applyAggregator(new LastNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-  
-  @Test
-  public void testPairs() {
-    List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
-    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(
-        SUM_LONGS.create(), MIN_DOUBLES.create());
-    assertEquals(Pair.of(1729L, -3.14), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-  
-  @Test
-  public void testPairsTwoLongs() {
-    List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
-    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(
-        SUM_LONGS.create(), SUM_LONGS.create());
-    assertEquals(Pair.of(1729L, 20L), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-  
-  @Test
-  public void testTrips() {
-    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(
-        Tuple3.of(17.29f, 12.2, 0.1), Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
-    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(
-        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create());
-    assertEquals(Tuple3.of(17.29f, 14.5, -0.98),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-  
-  @Test
-  public void testQuads() {
-    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(
-        Tuple4.of(17.29f, 12.2, 0.1, 1), Tuple4.of(3.0f, 1.2, 3.14, 2),
-        Tuple4.of(-1.0f, 14.5, -0.98, 3));
-    Aggregator<Tuple4<Float, Double, Double, Integer>> a =
-        new QuadAggregator<Float, Double, Double, Integer>(MAX_FLOATS.create(),
-            MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
-    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testTupleN() {
-    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L),
-        new TupleN(4, 17.0, 1, 9.7, 12L));
-    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(),
-        MAX_INTS.create(), MIN_DOUBLES.create(), MAX_LONGS.create());
-    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/FilterFnTest.java b/src/test/java/com/cloudera/crunch/FilterFnTest.java
deleted file mode 100644
index f9a9479..0000000
--- a/src/test/java/com/cloudera/crunch/FilterFnTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class FilterFnTest {
-
-  private static final FilterFn<String> TRUE = new FilterFn<String>() {
-    @Override
-    public boolean accept(String input) {
-      return true;
-    }
-  };
-
-  private static final FilterFn<String> FALSE = new FilterFn<String>() {
-    @Override
-    public boolean accept(String input) {
-      return false;
-    }
-  };
-  
-  @Test
-  public void testAnd() {
-    assertTrue(FilterFn.and(TRUE).accept("foo"));
-    assertTrue(FilterFn.and(TRUE, TRUE).accept("foo"));
-    assertFalse(FilterFn.and(TRUE, FALSE).accept("foo"));
-    assertFalse(FilterFn.and(FALSE, FALSE, FALSE).accept("foo"));
-  }
-  
-  @Test
-  public void testOr() {
-    assertFalse(FilterFn.or(FALSE).accept("foo"));
-    assertTrue(FilterFn.or(FALSE, TRUE).accept("foo"));
-    assertTrue(FilterFn.or(TRUE, FALSE, TRUE).accept("foo"));
-    assertFalse(FilterFn.or(FALSE, FALSE, FALSE).accept("foo"));
-  }
-
-  @Test
-  public void testNot() {
-    assertFalse(FilterFn.not(TRUE).accept("foo"));
-    assertTrue(FilterFn.not(FALSE).accept("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/MapsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/MapsTest.java b/src/test/java/com/cloudera/crunch/MapsTest.java
deleted file mode 100644
index 724a469..0000000
--- a/src/test/java/com/cloudera/crunch/MapsTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.cloudera.crunch;
-
-import java.util.Map;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-public class MapsTest {
-
-  @Test
-  public void testWritables() throws Exception {
-	run(WritableTypeFamily.getInstance());
-  }
-  
-  @Test
-  public void testAvros() throws Exception {
-	run(AvroTypeFamily.getInstance());
-  }
-  
-  public static void run(PTypeFamily typeFamily) throws Exception {
-	Pipeline pipeline = new MRPipeline(MapsTest.class);
-    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Map<String, Long>>> output = shakespeare.parallelDo(
-      new DoFn<String, Pair<String, Map<String, Long>>>() {
-	    @Override
-	    public void process(String input,
-		    Emitter<Pair<String, Map<String, Long>>> emitter) {
-		  String last = null;
-		  for (String word : input.toLowerCase().split("\\W+")) {
-		    if (!word.isEmpty()) {
-			  String firstChar = word.substring(0, 1);
-		      if (last != null) {
-		    	Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
-			    emitter.emit(Pair.of(last, cc));
-		      }
-		      last = firstChar;
-		    }
-		  }
-	    }
-      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs())))
-      .groupByKey()
-      .combineValues(new CombineFn<String, Map<String, Long>>() {
-	    @Override
-	    public void process(Pair<String, Iterable<Map<String, Long>>> input,
-		    Emitter<Pair<String, Map<String, Long>>> emitter) {
-		  Map<String, Long> agg = Maps.newHashMap();
-		  for (Map<String, Long> in : input.second()) {
-		    for (Map.Entry<String, Long> e : in.entrySet()) {
-			  if (!agg.containsKey(e.getKey())) {
-			    agg.put(e.getKey(), e.getValue());
-			  } else {
-			    agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
-			  }
-		    }
-		  }
-		  emitter.emit(Pair.of(input.first(), agg));
-	    }
-	  }).materialize();
-    boolean passed = false;
-    for (Pair<String, Map<String, Long>> v : output) {
-      if (v.first() == "k" && v.second().get("n") == 8L) {
-    	passed = true;
-    	break;
-      }
-    }
-    pipeline.done();
-  }
-}


Mime
View raw message