crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [25/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:27 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
new file mode 100644
index 0000000..78cf3ae
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.writable;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypes;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Defines static methods that are analogous to the methods defined in
+ * {@link WritableTypeFamily} for convenient static importing.
+ * 
+ */
+public class Writables {
+  private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>() {
+    @Override
+    public Void map(NullWritable input) {
+      return null;
+    }
+  };
+
+  private static final MapFn<Void, NullWritable> VOID_TO_NULL_WRITABLE = new MapFn<Void, NullWritable>() {
+    @Override
+    public NullWritable map(Void input) {
+      return NullWritable.get();
+    }
+  };
+
+  private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text, String>() {
+    @Override
+    public String map(Text input) {
+      return input.toString();
+    }
+  };
+
+  private static final MapFn<String, Text> STRING_TO_TEXT = new MapFn<String, Text>() {
+    @Override
+    public Text map(String input) {
+      return new Text(input);
+    }
+  };
+
+  private static final MapFn<IntWritable, Integer> IW_TO_INT = new MapFn<IntWritable, Integer>() {
+    @Override
+    public Integer map(IntWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Integer, IntWritable> INT_TO_IW = new MapFn<Integer, IntWritable>() {
+    @Override
+    public IntWritable map(Integer input) {
+      return new IntWritable(input);
+    }
+  };
+
+  private static final MapFn<LongWritable, Long> LW_TO_LONG = new MapFn<LongWritable, Long>() {
+    @Override
+    public Long map(LongWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Long, LongWritable> LONG_TO_LW = new MapFn<Long, LongWritable>() {
+    @Override
+    public LongWritable map(Long input) {
+      return new LongWritable(input);
+    }
+  };
+
+  private static final MapFn<FloatWritable, Float> FW_TO_FLOAT = new MapFn<FloatWritable, Float>() {
+    @Override
+    public Float map(FloatWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Float, FloatWritable> FLOAT_TO_FW = new MapFn<Float, FloatWritable>() {
+    @Override
+    public FloatWritable map(Float input) {
+      return new FloatWritable(input);
+    }
+  };
+
+  private static final MapFn<DoubleWritable, Double> DW_TO_DOUBLE = new MapFn<DoubleWritable, Double>() {
+    @Override
+    public Double map(DoubleWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final MapFn<Double, DoubleWritable> DOUBLE_TO_DW = new MapFn<Double, DoubleWritable>() {
+    @Override
+    public DoubleWritable map(Double input) {
+      return new DoubleWritable(input);
+    }
+  };
+
+  private static final MapFn<BooleanWritable, Boolean> BW_TO_BOOLEAN = new MapFn<BooleanWritable, Boolean>() {
+    @Override
+    public Boolean map(BooleanWritable input) {
+      return input.get();
+    }
+  };
+
+  private static final BooleanWritable TRUE = new BooleanWritable(true);
+  private static final BooleanWritable FALSE = new BooleanWritable(false);
+  private static final MapFn<Boolean, BooleanWritable> BOOLEAN_TO_BW = new MapFn<Boolean, BooleanWritable>() {
+    @Override
+    public BooleanWritable map(Boolean input) {
+      return input == Boolean.TRUE ? TRUE : FALSE;
+    }
+  };
+
+  private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new MapFn<BytesWritable, ByteBuffer>() {
+    @Override
+    public ByteBuffer map(BytesWritable input) {
+      return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
+    }
+  };
+
+  private static final MapFn<ByteBuffer, BytesWritable> BB_TO_BW = new MapFn<ByteBuffer, BytesWritable>() {
+    @Override
+    public BytesWritable map(ByteBuffer input) {
+      BytesWritable bw = new BytesWritable();
+      bw.set(input.array(), input.arrayOffset(), input.limit());
+      return bw;
+    }
+  };
+
+  private static <S, W extends Writable> WritableType<S, W> create(Class<S> typeClass, Class<W> writableClass,
+      MapFn<W, S> inputDoFn, MapFn<S, W> outputDoFn) {
+    return new WritableType<S, W>(typeClass, writableClass, inputDoFn, outputDoFn);
+  }
+
+  private static final WritableType<Void, NullWritable> nulls = create(Void.class, NullWritable.class,
+      NULL_WRITABLE_TO_VOID, VOID_TO_NULL_WRITABLE);
+  private static final WritableType<String, Text> strings = create(String.class, Text.class, TEXT_TO_STRING,
+      STRING_TO_TEXT);
+  private static final WritableType<Long, LongWritable> longs = create(Long.class, LongWritable.class, LW_TO_LONG,
+      LONG_TO_LW);
+  private static final WritableType<Integer, IntWritable> ints = create(Integer.class, IntWritable.class, IW_TO_INT,
+      INT_TO_IW);
+  private static final WritableType<Float, FloatWritable> floats = create(Float.class, FloatWritable.class,
+      FW_TO_FLOAT, FLOAT_TO_FW);
+  private static final WritableType<Double, DoubleWritable> doubles = create(Double.class, DoubleWritable.class,
+      DW_TO_DOUBLE, DOUBLE_TO_DW);
+  private static final WritableType<Boolean, BooleanWritable> booleans = create(Boolean.class, BooleanWritable.class,
+      BW_TO_BOOLEAN, BOOLEAN_TO_BW);
+  private static final WritableType<ByteBuffer, BytesWritable> bytes = create(ByteBuffer.class, BytesWritable.class,
+      BW_TO_BB, BB_TO_BW);
+
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
+      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
+      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
+  private static final Map<Class<?>, WritableType<?, ?>> EXTENSIONS = Maps.newHashMap();
+
+  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
+    return (PType<T>) PRIMITIVES.get(clazz);
+  }
+
+  public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
+    EXTENSIONS.put(clazz, ptype);
+  }
+
+  public static final WritableType<Void, NullWritable> nulls() {
+    return nulls;
+  }
+
+  public static final WritableType<String, Text> strings() {
+    return strings;
+  }
+
+  public static final WritableType<Long, LongWritable> longs() {
+    return longs;
+  }
+
+  public static final WritableType<Integer, IntWritable> ints() {
+    return ints;
+  }
+
+  public static final WritableType<Float, FloatWritable> floats() {
+    return floats;
+  }
+
+  public static final WritableType<Double, DoubleWritable> doubles() {
+    return doubles;
+  }
+
+  public static final WritableType<Boolean, BooleanWritable> booleans() {
+    return booleans;
+  }
+
+  public static final WritableType<ByteBuffer, BytesWritable> bytes() {
+    return bytes;
+  }
+
+  public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
+    if (EXTENSIONS.containsKey(clazz)) {
+      return (WritableType<T, W>) EXTENSIONS.get(clazz);
+    }
+    if (Writable.class.isAssignableFrom(clazz)) {
+      return (WritableType<T, W>) writables(clazz.asSubclass(Writable.class));
+    } else {
+      throw new IllegalArgumentException(
+          "Cannot create Writable records from non-Writable class"+ clazz.getCanonicalName());
+    }
+  }
+
+  public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) {
+    MapFn wIdentity = IdentityFn.getInstance();
+    return new WritableType<W, W>(clazz, clazz, wIdentity, wIdentity);
+  }
+
+  public static <K, V> WritableTableType<K, V> tableOf(PType<K> key, PType<V> value) {
+    if (key instanceof WritableTableType) {
+      WritableTableType wtt = (WritableTableType) key;
+      key = pairs(wtt.getKeyType(), wtt.getValueType());
+    } else if (!(key instanceof WritableType)) {
+      throw new IllegalArgumentException("Key type must be of class WritableType");
+    }
+    if (value instanceof WritableTableType) {
+      WritableTableType wtt = (WritableTableType) value;
+      value = pairs(wtt.getKeyType(), wtt.getValueType());
+    } else if (!(value instanceof WritableType)) {
+      throw new IllegalArgumentException("Value type must be of class WritableType");
+    }
+    return new WritableTableType((WritableType) key, (WritableType) value);
+  }
+
+  /**
+   * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
+   * 
+   */
+  private static class TWTupleMapFn extends MapFn<TupleWritable, Tuple> {
+    private final TupleFactory<?> tupleFactory;
+    private final List<MapFn> fns;
+
+    private transient Object[] values;
+
+    public TWTupleMapFn(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
+      this.tupleFactory = tupleFactory;
+      this.fns = Lists.newArrayList();
+      for (PType ptype : ptypes) {
+        fns.add(ptype.getInputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+      // The rest of the methods allocate new
+      // objects each time. However this one
+      // uses Tuple.tuplify which does a copy
+      this.values = new Object[fns.size()];
+      tupleFactory.initialize();
+    }
+
+    @Override
+    public Tuple map(TupleWritable in) {
+      for (int i = 0; i < values.length; i++) {
+        if (in.has(i)) {
+          values[i] = fns.get(i).map(in.get(i));
+        } else {
+          values[i] = null;
+        }
+      }
+      return tupleFactory.makeTuple(values);
+    }
+  }
+
+  /**
+   * For mapping from {@code Tuple}s to {@code TupleWritable}s.
+   * 
+   */
+  private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
+
+    private transient TupleWritable writable;
+    private transient Writable[] values;
+
+    private final List<MapFn> fns;
+
+    public TupleTWMapFn(PType<?>... ptypes) {
+      this.fns = Lists.newArrayList();
+      for (PType<?> ptype : ptypes) {
+        fns.add(ptype.getOutputMapFn());
+      }
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (MapFn fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      this.values = new Writable[fns.size()];
+      this.writable = new TupleWritable(values);
+      for (MapFn fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public TupleWritable map(Tuple input) {
+      writable.clearWritten();
+      for (int i = 0; i < input.size(); i++) {
+        Object value = input.get(i);
+        if (value != null) {
+          writable.setWritten(i);
+          values[i] = (Writable) fns.get(i).map(value);
+        }
+      }
+      return writable;
+    }
+  }
+
+  public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2);
+    TupleTWMapFn output = new TupleTWMapFn(p1, p2);
+    return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2);
+  }
+
+  public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3);
+    TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
+    return new WritableType(Tuple3.class, TupleWritable.class, input, output, p1, p2, p3);
+  }
+
+  public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3, PType<V4> p4) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4);
+    TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
+    return new WritableType(Tuple4.class, TupleWritable.class, input, output, p1, p2, p3, p4);
+  }
+
+  public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) {
+    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes);
+    TupleTWMapFn output = new TupleTWMapFn(ptypes);
+    return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes);
+  }
+
+  public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType... ptypes) {
+    Class[] typeArgs = new Class[ptypes.length];
+    for (int i = 0; i < typeArgs.length; i++) {
+      typeArgs[i] = ptypes[i].getTypeClass();
+    }
+    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
+    TWTupleMapFn input = new TWTupleMapFn(factory, ptypes);
+    TupleTWMapFn output = new TupleTWMapFn(ptypes);
+    return new WritableType(clazz, TupleWritable.class, input, output, ptypes);
+  }
+
+  public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
+    WritableType<S, ?> wt = (WritableType<S, ?>) base;
+    MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
+    MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
+    return new WritableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0]));
+  }
+
+  private static class ArrayCollectionMapFn<T> extends MapFn<GenericArrayWritable, Collection<T>> {
+    private final MapFn<Object, T> mapFn;
+
+    public ArrayCollectionMapFn(MapFn<Object, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public Collection<T> map(GenericArrayWritable input) {
+      Collection<T> collection = Lists.newArrayList();
+      for (Writable writable : input.get()) {
+        collection.add(mapFn.map(writable));
+      }
+      return collection;
+    }
+  }
+
+  private static class CollectionArrayMapFn<T> extends MapFn<Collection<T>, GenericArrayWritable> {
+
+    private final Class<? extends Writable> clazz;
+    private final MapFn<T, Object> mapFn;
+
+    public CollectionArrayMapFn(Class<? extends Writable> clazz, MapFn<T, Object> mapFn) {
+      this.clazz = clazz;
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public GenericArrayWritable map(Collection<T> input) {
+      GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz);
+      Writable[] w = new Writable[input.size()];
+      int index = 0;
+      for (T in : input) {
+        w[index++] = ((Writable) mapFn.map(in));
+      }
+      arrayWritable.set(w);
+      return arrayWritable;
+    }
+  }
+
+  public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) {
+    WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
+    return new WritableType(Collection.class, GenericArrayWritable.class, new ArrayCollectionMapFn(wt.getInputMapFn()),
+        new CollectionArrayMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+  }
+
+  private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> {
+    private final MapFn<Writable, T> mapFn;
+
+    public MapInputMapFn(MapFn<Writable, T> mapFn) {
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public Map<String, T> map(TextMapWritable<Writable> input) {
+      Map<String, T> out = Maps.newHashMap();
+      for (Map.Entry<Text, Writable> e : input.entrySet()) {
+        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
+      }
+      return out;
+    }
+  }
+
+  private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> {
+
+    private final Class<Writable> clazz;
+    private final MapFn<T, Writable> mapFn;
+
+    public MapOutputMapFn(Class<Writable> clazz, MapFn<T, Writable> mapFn) {
+      this.clazz = clazz;
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      mapFn.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      mapFn.initialize();
+    }
+
+    @Override
+    public TextMapWritable<Writable> map(Map<String, T> input) {
+      TextMapWritable<Writable> tmw = new TextMapWritable<Writable>(clazz);
+      for (Map.Entry<String, T> e : input.entrySet()) {
+        tmw.put(new Text(e.getKey()), mapFn.map(e.getValue()));
+      }
+      return tmw;
+    }
+  }
+
+  public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
+    WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
+    return new WritableType(Map.class, TextMapWritable.class, new MapInputMapFn(wt.getInputMapFn()),
+        new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+  }
+
+  public static <T> PType<T> jsons(Class<T> clazz) {
+    return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
+  }
+
+  // Not instantiable
+  private Writables() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/types/writable/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/package-info.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/package-info.java
new file mode 100644
index 0000000..7d54743
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Business object serialization using Hadoop's Writables framework.
+ */
+package org.apache.crunch.types.writable;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/util/CrunchTool.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/CrunchTool.java b/crunch-core/src/main/java/org/apache/crunch/util/CrunchTool.java
new file mode 100644
index 0000000..ea66291
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/CrunchTool.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.io.Serializable;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * An extension of the {@code Tool} interface that creates a {@code Pipeline}
+ * instance and provides methods for working with the Pipeline from inside of
+ * the Tool's run method.
+ * 
+ */
+public abstract class CrunchTool extends Configured implements Tool, Serializable {
+
+  protected static final From from = new From();
+  protected static final To to = new To();
+  protected static final At at = new At();
+
+  // Pipeline object itself isn't necessarily serializable.
+  private transient Pipeline pipeline;
+
+  public CrunchTool() {
+    this(false);
+  }
+
+  public CrunchTool(boolean inMemory) {
+    this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass());
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf != null && pipeline != null) {
+      pipeline.setConfiguration(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return pipeline.getConfiguration();
+  }
+
+  public void enableDebug() {
+    pipeline.enableDebug();
+  }
+
+  public <T> PCollection<T> read(Source<T> source) {
+    return pipeline.read(source);
+  }
+
+  public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
+    return pipeline.read(tableSource);
+  }
+
+  public PCollection<String> readTextFile(String pathName) {
+    return pipeline.readTextFile(pathName);
+  }
+
+  public void write(PCollection<?> pcollection, Target target) {
+    pipeline.write(pcollection, target);
+  }
+
+  public void writeTextFile(PCollection<?> pcollection, String pathName) {
+    pipeline.writeTextFile(pcollection, pathName);
+  }
+  
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+    return pipeline.materialize(pcollection);
+  }
+
+  public PipelineResult run() {
+    return pipeline.run();
+  }
+
+  public PipelineExecution runAsync() {
+    return pipeline.runAsync();
+  }
+
+  public PipelineResult done() {
+    return pipeline.done();
+  }
+
+  protected Pipeline getPipeline() {
+    return pipeline;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
new file mode 100644
index 0000000..3e49930
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.Enumeration;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provides functions for working with Hadoop's distributed cache. These
+ * include:
+ * <ul>
+ * <li>
+ * Functions for working with a job-specific distributed cache of objects, like
+ * the serialized runtime nodes in a MapReduce.</li>
+ * <li>
+ * Functions for adding library jars to the distributed cache, which will be
+ * added to the classpath of MapReduce tasks.</li>
+ * </ul>
+ */
+public class DistCache {
+
+  // Configuration key holding the paths of jars to export to the distributed
+  // cache.
+  private static final String TMPJARS_KEY = "tmpjars";
+
+  public static void write(Configuration conf, Path path, Object value) throws IOException {
+    ObjectOutputStream oos = new ObjectOutputStream(path.getFileSystem(conf).create(path));
+    oos.writeObject(value);
+    oos.close();
+
+    DistributedCache.addCacheFile(path.toUri(), conf);
+  }
+
+  public static Object read(Configuration conf, Path path) throws IOException {
+    URI target = null;
+    for (URI uri : DistributedCache.getCacheFiles(conf)) {
+      if (uri.toString().equals(path.toString())) {
+        target = uri;
+        break;
+      }
+    }
+    Object value = null;
+    if (target != null) {
+      Path targetPath = new Path(target.toString());
+      ObjectInputStream ois = new ObjectInputStream(targetPath.getFileSystem(conf).open(targetPath));
+      try {
+        value = ois.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new CrunchRuntimeException(e);
+      }
+      ois.close();
+    }
+    return value;
+  }
+
+  public static void addCacheFile(Path path, Configuration conf) {
+    DistributedCache.addCacheFile(path.toUri(), conf);
+  }
+  
+  public static Path getPathToCacheFile(Path path, Configuration conf) {
+    try {
+      for (Path localPath : DistributedCache.getLocalCacheFiles(conf)) {
+        if (localPath.toString().endsWith(path.getName())) {
+          return localPath.makeQualified(FileSystem.getLocal(conf));
+        }
+      }
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+    return null;
+  }
+  
+  /**
+   * Adds the specified jar to the distributed cache of jobs using the provided
+   * configuration. The jar will be placed on the classpath of tasks run by the
+   * job.
+   * 
+   * @param conf
+   *          The configuration used to add the jar to the distributed cache.
+   * @param jarFile
+   *          The jar file to add to the distributed cache.
+   * @throws IOException
+   *           If the jar file does not exist or there is a problem accessing
+   *           the file.
+   */
+  public static void addJarToDistributedCache(Configuration conf, File jarFile) throws IOException {
+    if (!jarFile.exists()) {
+      throw new IOException("Jar file: " + jarFile.getCanonicalPath() + " does not exist.");
+    }
+    if (!jarFile.getName().endsWith(".jar")) {
+      throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar " + "file.");
+    }
+    // Get a qualified path for the jar.
+    FileSystem fileSystem = FileSystem.getLocal(conf);
+    Path jarPath = new Path(jarFile.getCanonicalPath());
+    String qualifiedPath = jarPath.makeQualified(fileSystem).toString();
+    // Add the jar to the configuration variable.
+    String jarConfiguration = conf.get(TMPJARS_KEY, "");
+    if (!jarConfiguration.isEmpty()) {
+      jarConfiguration += ",";
+    }
+    jarConfiguration += qualifiedPath;
+    conf.set(TMPJARS_KEY, jarConfiguration);
+  }
+
+  /**
+   * Adds the jar at the specified path to the distributed cache of jobs using
+   * the provided configuration. The jar will be placed on the classpath of
+   * tasks run by the job.
+   * 
+   * @param conf
+   *          The configuration used to add the jar to the distributed cache.
+   * @param jarFile
+   *          The path to the jar file to add to the distributed cache.
+   * @throws IOException
+   *           If the jar file does not exist or there is a problem accessing
+   *           the file.
+   */
+  public static void addJarToDistributedCache(Configuration conf, String jarFile) throws IOException {
+    addJarToDistributedCache(conf, new File(jarFile));
+  }
+
+  /**
+   * Finds the path to a jar that contains the class provided, if any. There is
+   * no guarantee that the jar returned will be the first on the classpath to
+   * contain the file. This method is basically lifted out of Hadoop's
+   * {@link org.apache.hadoop.mapred.JobConf} class.
+   * 
+   * @param jarClass
+   *          The class the jar file should contain.
+   * @return The path to a jar file that contains the class, or
+   *         <code>null</code> if no such jar exists.
+   * @throws IOException
+   *           If there is a problem searching for the jar file.
+   */
+  public static String findContainingJar(Class<?> jarClass) throws IOException {
+    ClassLoader loader = jarClass.getClassLoader();
+    String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
+    for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) {
+      URL url = itr.nextElement();
+      if ("jar".equals(url.getProtocol())) {
+        String toReturn = url.getPath();
+        if (toReturn.startsWith("file:")) {
+          toReturn = toReturn.substring("file:".length());
+        }
+        // URLDecoder is a misnamed class, since it actually decodes
+        // x-www-form-urlencoded MIME type rather than actual
+        // URL encoding (which the file path has). Therefore it would
+        // decode +s to ' 's which is incorrect (spaces are actually
+        // either unencoded or encoded as "%20"). Replace +s first, so
+        // that they are kept sacred during the decoding process.
+        toReturn = toReturn.replaceAll("\\+", "%2B");
+        toReturn = URLDecoder.decode(toReturn, "UTF-8");
+        return toReturn.replaceAll("!.*$", "");
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Adds all jars under the specified directory to the distributed cache of
+   * jobs using the provided configuration. The jars will be placed on the
+   * classpath of tasks run by the job. This method does not descend into
+   * subdirectories when adding jars.
+   * 
+   * @param conf
+   *          The configuration used to add jars to the distributed cache.
+   * @param jarDirectory
+   *          A directory containing jar files to add to the distributed cache.
+   * @throws IOException
+   *           If the directory does not exist or there is a problem accessing
+   *           the directory.
+   */
+  public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory) throws IOException {
+    if (!jarDirectory.exists() || !jarDirectory.isDirectory()) {
+      throw new IOException("Jar directory: " + jarDirectory.getCanonicalPath() + " does not "
+          + "exist or is not a directory.");
+    }
+    for (File file : jarDirectory.listFiles()) {
+      if (!file.isDirectory() && file.getName().endsWith(".jar")) {
+        addJarToDistributedCache(conf, file);
+      }
+    }
+  }
+
+  /**
+   * Adds all jars under the directory at the specified path to the distributed
+   * cache of jobs using the provided configuration. The jars will be placed on
+   * the classpath of the tasks run by the job. This method does not descend
+   * into subdirectories when adding jars.
+   * 
+   * @param conf
+   *          The configuration used to add jars to the distributed cache.
+   * @param jarDirectory
+   *          The path to a directory containing jar files to add to the
+   *          distributed cache.
+   * @throws IOException
+   *           If the directory does not exist or there is a problem accessing
+   *           the directory.
+   */
+  public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory) throws IOException {
+    addJarDirToDistributedCache(conf, new File(jarDirectory));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
new file mode 100644
index 0000000..da8db6b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import org.apache.crunch.PCollection;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public class PartitionUtils {
+  public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task";
+  public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L;
+  
+  public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) {
+    long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK);
+    return 1 + (int) (pcollection.getSize() / bytesPerTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/util/Tuples.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/Tuples.java b/crunch-core/src/main/java/org/apache/crunch/util/Tuples.java
new file mode 100644
index 0000000..9c8d7bd
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/Tuples.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * Utilities for working with subclasses of the {@code Tuple} interface.
+ * 
+ */
+public class Tuples {
+
+  private static abstract class TuplifyIterator<T> extends UnmodifiableIterator<T> {
+    protected List<Iterator<?>> iterators;
+
+    public TuplifyIterator(Iterator<?>... iterators) {
+      this.iterators = Lists.newArrayList(iterators);
+    }
+
+    @Override
+    public boolean hasNext() {
+      for (Iterator<?> iter : iterators) {
+        if (!iter.hasNext()) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    protected Object next(int index) {
+      return iterators.get(index).next();
+    }
+  }
+
+  public static class PairIterable<S, T> implements Iterable<Pair<S, T>> {
+    private final Iterable<S> first;
+    private final Iterable<T> second;
+
+    public PairIterable(Iterable<S> first, Iterable<T> second) {
+      this.first = first;
+      this.second = second;
+    }
+
+    @Override
+    public Iterator<Pair<S, T>> iterator() {
+      return new TuplifyIterator<Pair<S, T>>(first.iterator(), second.iterator()) {
+        @Override
+        public Pair<S, T> next() {
+          return Pair.of((S) next(0), (T) next(1));
+        }
+      };
+    }
+  }
+
+  public static class TripIterable<A, B, C> implements Iterable<Tuple3<A, B, C>> {
+    private final Iterable<A> first;
+    private final Iterable<B> second;
+    private final Iterable<C> third;
+
+    public TripIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third) {
+      this.first = first;
+      this.second = second;
+      this.third = third;
+    }
+
+    @Override
+    public Iterator<Tuple3<A, B, C>> iterator() {
+      return new TuplifyIterator<Tuple3<A, B, C>>(first.iterator(), second.iterator(), third.iterator()) {
+        @Override
+        public Tuple3<A, B, C> next() {
+          return new Tuple3<A, B, C>((A) next(0), (B) next(1), (C) next(2));
+        }
+      };
+    }
+  }
+
+  public static class QuadIterable<A, B, C, D> implements Iterable<Tuple4<A, B, C, D>> {
+    private final Iterable<A> first;
+    private final Iterable<B> second;
+    private final Iterable<C> third;
+    private final Iterable<D> fourth;
+
+    public QuadIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third, Iterable<D> fourth) {
+      this.first = first;
+      this.second = second;
+      this.third = third;
+      this.fourth = fourth;
+    }
+
+    @Override
+    public Iterator<Tuple4<A, B, C, D>> iterator() {
+      return new TuplifyIterator<Tuple4<A, B, C, D>>(first.iterator(), second.iterator(), third.iterator(),
+          fourth.iterator()) {
+        @Override
+        public Tuple4<A, B, C, D> next() {
+          return new Tuple4<A, B, C, D>((A) next(0), (B) next(1), (C) next(2), (D) next(3));
+        }
+      };
+    }
+  }
+
+  public static class TupleNIterable implements Iterable<TupleN> {
+    private final Iterator<?>[] iters;
+
+    public TupleNIterable(Iterable<?>... iterables) {
+      this.iters = new Iterator[iterables.length];
+      for (int i = 0; i < iters.length; i++) {
+        iters[i] = iterables[i].iterator();
+      }
+    }
+
+    @Override
+    public Iterator<TupleN> iterator() {
+      return new TuplifyIterator<TupleN>(iters) {
+        @Override
+        public TupleN next() {
+          Object[] values = new Object[iters.length];
+          for (int i = 0; i < values.length; i++) {
+            values[i] = next(i);
+          }
+          return new TupleN(values);
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/util/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/package-info.java b/crunch-core/src/main/java/org/apache/crunch/util/package-info.java
new file mode 100644
index 0000000..94d79a1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An assorted set of utilities.
+ */
+package org.apache.crunch.util;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/resources/log4j.properties b/crunch-core/src/main/resources/log4j.properties
new file mode 100644
index 0000000..506b527
--- /dev/null
+++ b/crunch-core/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/site/site.xml
----------------------------------------------------------------------
diff --git a/crunch-core/src/site/site.xml b/crunch-core/src/site/site.xml
new file mode 100644
index 0000000..73fbd17
--- /dev/null
+++ b/crunch-core/src/site/site.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project name="${project.name}"
+  xmlns="http://maven.apache.org/DECORATION/1.3.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0
+                      http://maven.apache.org/xsd/decoration-1.3.0.xsd">
+
+  <body>
+    <!-- Note: Breadcrumbs for Doxia's Markdown parser are currently broken,
+               see https://jira.codehaus.org/browse/DOXIA-472 -->
+    <breadcrumbs>
+      <item name="Apache" href="http://www.apache.org/index.html" />
+      <item name="Crunch" href="../index.html"/>
+    </breadcrumbs>
+
+  </body>
+
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/avro/employee.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/avro/employee.avsc b/crunch-core/src/test/avro/employee.avsc
new file mode 100644
index 0000000..35726e1
--- /dev/null
+++ b/crunch-core/src/test/avro/employee.avsc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+"namespace": "org.apache.crunch.test",
+"name": "Employee",
+"type": "record",
+"fields": [
+  {"name": "name", "type": ["string", "null"] },
+  {"name": "salary", "type": "int"},
+  {"name": "department", "type": ["string", "null"] } ]
+} 

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/avro/person.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/avro/person.avsc b/crunch-core/src/test/avro/person.avsc
new file mode 100644
index 0000000..babd808
--- /dev/null
+++ b/crunch-core/src/test/avro/person.avsc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+"namespace": "org.apache.crunch.test",
+"name": "Person",
+"type": "record",
+"fields": [
+  {"name": "name", "type": ["string", "null"] },
+  {"name": "age", "type": "int"},
+  {"name": "siblingnames", "type": {"type": "array", "items": "string"}} ]
+} 

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java b/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java
new file mode 100644
index 0000000..4b00874
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.FilterFn.AndFn;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AndFnTest {
+
+  private FilterFn<Integer> fnA;
+  private FilterFn<Integer> fnB;
+  private AndFn<Integer> andFn;
+
+  @Before
+  public void setUp() {
+    fnA = mock(FilterFn.class);
+    fnB = mock(FilterFn.class);
+    andFn = new AndFn(fnA, fnB);
+  }
+
+  @Test
+  public void testSetContext() {
+    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
+    andFn.setContext(context);
+
+    verify(fnA).setContext(context);
+    verify(fnB).setContext(context);
+  }
+
+  @Test
+  public void testAccept_False() {
+    when(fnA.accept(1)).thenReturn(true);
+    when(fnB.accept(1)).thenReturn(false);
+
+    assertFalse(andFn.accept(1));
+  }
+
+  @Test
+  public void testAccept_True() {
+    when(fnA.accept(1)).thenReturn(true);
+    when(fnB.accept(1)).thenReturn(true);
+
+    assertTrue(andFn.accept(1));
+  }
+
+  @Test
+  public void testCleanup() {
+    andFn.cleanup(mock(Emitter.class));
+
+    verify(fnA).cleanup();
+    verify(fnB).cleanup();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java
new file mode 100644
index 0000000..39548e2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.crunch.CombineFn.MAX_BIGINTS;
+import static org.apache.crunch.CombineFn.MAX_DOUBLES;
+import static org.apache.crunch.CombineFn.MAX_FLOATS;
+import static org.apache.crunch.CombineFn.MAX_INTS;
+import static org.apache.crunch.CombineFn.MAX_LONGS;
+import static org.apache.crunch.CombineFn.MIN_BIGINTS;
+import static org.apache.crunch.CombineFn.MIN_DOUBLES;
+import static org.apache.crunch.CombineFn.MIN_FLOATS;
+import static org.apache.crunch.CombineFn.MIN_INTS;
+import static org.apache.crunch.CombineFn.MIN_LONGS;
+import static org.apache.crunch.CombineFn.SUM_BIGINTS;
+import static org.apache.crunch.CombineFn.SUM_DOUBLES;
+import static org.apache.crunch.CombineFn.SUM_FLOATS;
+import static org.apache.crunch.CombineFn.SUM_INTS;
+import static org.apache.crunch.CombineFn.SUM_LONGS;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.CombineFn.Aggregator;
+import org.apache.crunch.CombineFn.AggregatorFactory;
+import org.apache.crunch.CombineFn.FirstNAggregator;
+import org.apache.crunch.CombineFn.LastNAggregator;
+import org.apache.crunch.CombineFn.MaxNAggregator;
+import org.apache.crunch.CombineFn.MinNAggregator;
+import org.apache.crunch.CombineFn.PairAggregator;
+import org.apache.crunch.CombineFn.QuadAggregator;
+import org.apache.crunch.CombineFn.StringConcatAggregator;
+import org.apache.crunch.CombineFn.TripAggregator;
+import org.apache.crunch.CombineFn.TupleNAggregator;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class CombineFnTest {
+
+  private <T> Iterable<T> applyAggregator(AggregatorFactory<T> a, Iterable<T> values) {
+    return applyAggregator(a.create(), values);
+  }
+
+  private <T> Iterable<T> applyAggregator(Aggregator<T> a, Iterable<T> values) {
+    a.reset();
+    for (T value : values) {
+      a.update(value);
+    }
+    return a.results();
+  }
+
+  @Test
+  public void testSums() {
+    assertEquals(ImmutableList.of(1775L), applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+
+    assertEquals(ImmutableList.of(1765L), applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
+
+    assertEquals(ImmutableList.of(1775), applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(1775.0f), applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(1775.0), applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+
+    assertEquals(
+        ImmutableList.of(new BigInteger("1775")),
+        applyAggregator(SUM_BIGINTS,
+            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
+  }
+
+  @Test
+  public void testMax() {
+    assertEquals(ImmutableList.of(1729L), applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+
+    assertEquals(ImmutableList.of(1729), applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(1729.0f), applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(1729.0), applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+
+    assertEquals(ImmutableList.of(1745.0f), applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
+
+    assertEquals(
+        ImmutableList.of(new BigInteger("1729")),
+        applyAggregator(MAX_BIGINTS,
+            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
+  }
+
+  @Test
+  public void testMin() {
+    assertEquals(ImmutableList.of(17L), applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+
+    assertEquals(ImmutableList.of(17), applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(17.0f), applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(17.0), applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+
+    assertEquals(ImmutableList.of(29), applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
+
+    assertEquals(
+        ImmutableList.of(new BigInteger("17")),
+        applyAggregator(MIN_BIGINTS,
+            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
+  }
+
+  @Test
+  public void testMaxN() {
+    assertEquals(ImmutableList.of(98, 1009),
+        applyAggregator(new MaxNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testMinN() {
+    assertEquals(ImmutableList.of(17, 29),
+        applyAggregator(new MinNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testFirstN() {
+    assertEquals(ImmutableList.of(17, 34),
+        applyAggregator(new FirstNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testLastN() {
+    assertEquals(ImmutableList.of(29, 1009),
+        applyAggregator(new LastNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testPairs() {
+    List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
+    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(SUM_LONGS.create(), MIN_DOUBLES.create());
+    assertEquals(Pair.of(1729L, -3.14), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+
+  @Test
+  public void testPairsTwoLongs() {
+    List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
+    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(SUM_LONGS.create(), SUM_LONGS.create());
+    assertEquals(Pair.of(1729L, 20L), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+
+  @Test
+  public void testTrips() {
+    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(Tuple3.of(17.29f, 12.2, 0.1),
+        Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
+    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(MAX_FLOATS.create(),
+        MAX_DOUBLES.create(), MIN_DOUBLES.create());
+    assertEquals(Tuple3.of(17.29f, 14.5, -0.98), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+
+  @Test
+  public void testQuads() {
+    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(Tuple4.of(17.29f, 12.2, 0.1, 1),
+        Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
+    Aggregator<Tuple4<Float, Double, Double, Integer>> a = new QuadAggregator<Float, Double, Double, Integer>(
+        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
+    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+
+  @Test
+  public void testTupleN() {
+    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L), new TupleN(4, 17.0, 1, 9.7, 12L));
+    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(), MAX_INTS.create(),
+        MIN_DOUBLES.create(), MAX_LONGS.create());
+    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+
+  @Test
+  public void testConcatenation() {
+    String[] arrayNull = new String[] { null, "" };
+    assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator(
+        new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator(
+        new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("  "), applyAggregator(
+        new StringConcatAggregator(" ", true), ImmutableList.of(" ", "")));
+    assertEquals(ImmutableList.of(""), applyAggregator(
+        new StringConcatAggregator(" ", true), Arrays.asList(arrayNull)));
+    assertEquals(ImmutableList.of("foo bar"), applyAggregator(
+        new StringConcatAggregator(" ", true, 20, 3), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(
+        new StringConcatAggregator(" ", true, 10, 6), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("foo bar"), applyAggregator(
+        new StringConcatAggregator(" ", true, 9, 6), ImmutableList.of("foo", "foobar", "bar")));
+  }
+
+  @Test
+  public void testConcatenationReset() {
+    StringConcatAggregator a = new StringConcatAggregator(" ", true, 10, 6);
+
+    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(a, ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(a, ImmutableList.of("foo", "foobar", "bar")));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testConcatenationNullException() {
+    String[] arrayNull = new String[] { null, "" };
+    assertEquals(ImmutableList.of(""), applyAggregator(
+        new StringConcatAggregator(" ", false), Arrays.asList(arrayNull)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java b/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java
new file mode 100644
index 0000000..8af17a2
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.FilterFn.NotFn;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NotFnTest {
+  
+  private FilterFn<Integer> base;
+  private NotFn<Integer> notFn;
+  
+  @Before
+  public void setUp() {
+    base = mock(FilterFn.class);
+    notFn = new NotFn(base);
+  }
+
+  @Test
+  public void testSetContext() {
+    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
+    
+    notFn.setContext(context);
+    
+    verify(base).setContext(context);
+  }
+
+  @Test
+  public void testAccept_True() {
+    when(base.accept(1)).thenReturn(true);
+    
+    assertFalse(notFn.accept(1));
+  }
+  
+  @Test
+  public void testAccept_False() {
+    when(base.accept(1)).thenReturn(false);
+    
+    assertTrue(notFn.accept(1));
+  }
+
+  @Test
+  public void testCleanupEmitterOfT() {
+    notFn.cleanup(mock(Emitter.class));
+    
+    verify(base).cleanup();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java b/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java
new file mode 100644
index 0000000..fde2376
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.FilterFn.OrFn;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OrFnTest {
+
+  private FilterFn<Integer> fnA;
+  private FilterFn<Integer> fnB;
+  private OrFn<Integer> orFn;
+
+  @Before
+  public void setUp() {
+    fnA = mock(FilterFn.class);
+    fnB = mock(FilterFn.class);
+    orFn = new OrFn(fnA, fnB);
+  }
+
+  @Test
+  public void testSetContext() {
+    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
+
+    orFn.setContext(context);
+
+    verify(fnA).setContext(context);
+    verify(fnB).setContext(context);
+  }
+
+  @Test
+  public void testAccept_True() {
+    when(fnA.accept(1)).thenReturn(false);
+    when(fnB.accept(1)).thenReturn(true);
+
+    assertTrue(orFn.accept(1));
+  }
+
+  @Test
+  public void testAccept_False() {
+    when(fnA.accept(1)).thenReturn(false);
+    when(fnB.accept(1)).thenReturn(false);
+
+    assertFalse(orFn.accept(1));
+  }
+
+  @Test
+  public void testCleanupEmitterOfT() {
+    orFn.cleanup(mock(Emitter.class));
+
+    verify(fnA).cleanup();
+    verify(fnB).cleanup();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/PairTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/PairTest.java b/crunch-core/src/test/java/org/apache/crunch/PairTest.java
new file mode 100644
index 0000000..106413c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/PairTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class PairTest {
+
+  @Test
+  public void testPairConstructor() {
+    Pair<String, Integer> pair = new Pair<String, Integer>("brock", 45);
+    test(pair);
+  }
+
+  @Test
+  public void testPairOf() {
+    Pair<String, Integer> pair = Pair.of("brock", 45);
+    test(pair);
+  }
+
+  protected void test(Pair<String, Integer> pair) {
+    assertTrue(pair.size() == 2);
+
+    assertEquals("brock", pair.first());
+    assertEquals(new Integer(45), pair.second());
+    assertEquals(Pair.of("brock", 45), pair);
+
+    assertEquals("brock", pair.get(0));
+    assertEquals(new Integer(45), pair.get(1));
+
+    try {
+      pair.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testPairComparisons() {
+    assertEquals(0, Pair.of(null, null).compareTo(Pair.of(null, null)));
+    assertEquals(0, Pair.of(1, 2).compareTo(Pair.of(1, 2)));
+    assertTrue(Pair.of(2, "a").compareTo(Pair.of(1, "a")) > 0);
+    assertTrue(Pair.of("a", 2).compareTo(Pair.of("a", 1)) > 0);
+    assertTrue(Pair.of(null, 17).compareTo(Pair.of(null, 29)) < 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/TupleTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/TupleTest.java b/crunch-core/src/test/java/org/apache/crunch/TupleTest.java
new file mode 100644
index 0000000..b07ec3f
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/TupleTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.crunch.types.TupleFactory;
+import org.junit.Test;
+
+public class TupleTest {
+  private String first = "foo";
+  private Integer second = 1729;
+  private Double third = 64.2;
+  private Boolean fourth = false;
+  private Float fifth = 17.29f;
+
+  @Test
+  public void testTuple3() {
+    Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
+    assertEquals(3, t.size());
+    assertEquals(first, t.first());
+    assertEquals(second, t.second());
+    assertEquals(third, t.third());
+    assertEquals(first, t.get(0));
+    assertEquals(second, t.get(1));
+    assertEquals(third, t.get(2));
+    try {
+      t.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testTuple3Equality() {
+    Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
+    assertTrue(t.equals(new Tuple3(first, second, third)));
+    assertFalse(t.equals(new Tuple3(first, null, third)));
+    assertFalse((new Tuple3(null, null, null)).equals(t));
+    assertTrue((new Tuple3(first, null, null)).equals(new Tuple3(first, null, null)));
+  }
+
+  @Test
+  public void testTuple4() {
+    Tuple4<String, Integer, Double, Boolean> t = new Tuple4<String, Integer, Double, Boolean>(first, second, third,
+        fourth);
+    assertEquals(4, t.size());
+    assertEquals(first, t.first());
+    assertEquals(second, t.second());
+    assertEquals(third, t.third());
+    assertEquals(fourth, t.fourth());
+    assertEquals(first, t.get(0));
+    assertEquals(second, t.get(1));
+    assertEquals(third, t.get(2));
+    assertEquals(fourth, t.get(3));
+    try {
+      t.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testTuple4Equality() {
+    Tuple4<String, Integer, Double, Boolean> t = new Tuple4<String, Integer, Double, Boolean>(first, second, third,
+        fourth);
+    assertFalse(t.equals(new Tuple3(first, second, third)));
+    assertFalse(t.equals(new Tuple4(first, null, third, null)));
+    assertFalse((new Tuple4(null, null, null, null)).equals(t));
+    assertTrue((new Tuple4(first, null, third, null)).equals(new Tuple4(first, null, third, null)));
+  }
+
+  @Test
+  public void testTupleN() {
+    TupleN t = new TupleN(first, second, third, fourth, fifth);
+    assertEquals(5, t.size());
+    assertEquals(first, t.get(0));
+    assertEquals(second, t.get(1));
+    assertEquals(third, t.get(2));
+    assertEquals(fourth, t.get(3));
+    assertEquals(fifth, t.get(4));
+    try {
+      t.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testTupleNEquality() {
+    TupleN t = new TupleN(first, second, third, fourth, fifth);
+    assertTrue(t.equals(new TupleN(first, second, third, fourth, fifth)));
+    assertFalse(t.equals(new TupleN(first, null, third, null)));
+    assertFalse((new TupleN(null, null, null, null, null)).equals(t));
+    assertTrue((new TupleN(first, second, third, null, null)).equals(new TupleN(first, second, third, null, null)));
+  }
+
+  @Test
+  public void testTupleFactory() {
+    checkTuple(TupleFactory.PAIR.makeTuple("a", "b"), Pair.class, "a", "b");
+    checkTuple(TupleFactory.TUPLE3.makeTuple("a", "b", "c"), Tuple3.class, "a", "b", "c");
+    checkTuple(TupleFactory.TUPLE4.makeTuple("a", "b", "c", "d"), Tuple4.class, "a", "b", "c", "d");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d", "e"), TupleN.class, "a", "b", "c", "d", "e");
+
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b"), TupleN.class, "a", "b");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c"), TupleN.class, "a", "b", "c");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d"), TupleN.class, "a", "b", "c", "d");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d", "e"), TupleN.class, "a", "b", "c", "d", "e");
+  }
+
+  private void checkTuple(Tuple t, Class<? extends Tuple> type, Object... values) {
+    assertEquals(type, t.getClass());
+    assertEquals(values.length, t.size());
+    for (int i = 0; i < values.length; i++)
+      assertEquals(values[i], t.get(i));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java b/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java
new file mode 100644
index 0000000..e99ac7b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/WriteModeTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class WriteModeTest {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test(expected=CrunchRuntimeException.class)
+  public void testDefault() throws Exception {
+    run(null, true);
+  }
+
+  @Test(expected=CrunchRuntimeException.class)
+  public void testDefaultNoRun() throws Exception {
+    run(null, false);
+  }
+  
+  @Test
+  public void testOverwrite() throws Exception {
+    Path p = run(WriteMode.OVERWRITE, true);
+    PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+    assertEquals(ImmutableList.of("some", "string", "values"), lines.materialize());
+  }
+  
+  @Test(expected=CrunchRuntimeException.class)
+  public void testOverwriteNoRun() throws Exception {
+    run(WriteMode.OVERWRITE, false);
+  }
+  
+  @Test
+  public void testAppend() throws Exception {
+    Path p = run(WriteMode.APPEND, true);
+    PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+    assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"),
+        lines.materialize());
+  }
+  
+  @Test
+  public void testAppendNoRun() throws Exception {
+    Path p = run(WriteMode.APPEND, false);
+    PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+    assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"),
+        lines.materialize());
+  }
+  
+  Path run(WriteMode writeMode, boolean doRun) throws Exception {
+    Path output = tmpDir.getPath("existing");
+    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+    if (fs.exists(output)) {
+      fs.delete(output, true);
+    }
+    Pipeline p = MemPipeline.getInstance();
+    PCollection<String> data = MemPipeline.typedCollectionOf(Avros.strings(),
+        ImmutableList.of("some", "string", "values"));
+    data.write(To.textFile(output));
+
+    if (doRun) {
+      p.run();
+    }
+    
+    if (writeMode == null) {
+      data.write(To.textFile(output));
+    } else {
+      data.write(To.textFile(output), writeMode);
+    }
+    
+    p.run();
+    
+    return output;
+  }
+}


Mime
View raw message