incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [18/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/resources/urls.txt
----------------------------------------------------------------------
diff --git a/crunch/src/test/resources/urls.txt b/crunch/src/test/resources/urls.txt
new file mode 100644
index 0000000..827e711
--- /dev/null
+++ b/crunch/src/test/resources/urls.txt
@@ -0,0 +1,11 @@
+www.A.com	www.B.com
+www.A.com	www.C.com
+www.A.com	www.D.com
+www.A.com	www.E.com
+www.B.com	www.D.com
+www.B.com	www.E.com
+www.C.com	www.D.com
+www.D.com	www.B.com
+www.E.com	www.A.com
+www.F.com	www.B.com
+www.F.com	www.C.com

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index f3e934a..240a90e 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -18,11 +18,15 @@ under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
   <modelVersion>4.0.0</modelVersion>
-  <groupId>org.apache.crunch</groupId>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+
   <artifactId>crunch-examples</artifactId>
-  <packaging>jar</packaging>
-  <version>0.3.0</version>
   <name>Apache Incubator Crunch Examples</name>
 
   <properties>
@@ -72,7 +76,7 @@ under the License.
     <dependency>
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch</artifactId>
-      <version>0.3.0</version>
+      <version>0.3.0-SNAPSHOT</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4a1b36f..94c79b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,13 +18,21 @@ under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.crunch</groupId>
-  <artifactId>crunch</artifactId>
-  <packaging>jar</packaging>
-  <version>0.3.0</version>
+  <artifactId>crunch-parent</artifactId>
+  <version>0.3.0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
   <name>Apache Incubator Crunch</name>
 
+  <modules>
+    <module>crunch</module>
+    <module>examples</module>
+    <module>scrunch</module>
+  </modules>
+
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <avro.version>1.7.0</avro.version>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/scrunch/pom.xml b/scrunch/pom.xml
index 2c70f73..0627130 100644
--- a/scrunch/pom.xml
+++ b/scrunch/pom.xml
@@ -18,10 +18,15 @@ under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
   <modelVersion>4.0.0</modelVersion>
-  <groupId>org.apache.crunch</groupId>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+
   <artifactId>scrunch</artifactId>
-  <version>0.2.0</version>
   <name>Apache Incubator Crunch for Scala</name>
 
   <properties>
@@ -101,7 +106,7 @@ under the License.
     <dependency>
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch</artifactId>
-      <version>0.3.0</version>
+      <version>0.3.0-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/CombineFn.java b/src/main/java/org/apache/crunch/CombineFn.java
deleted file mode 100644
index e552286..0000000
--- a/src/main/java/org/apache/crunch/CombineFn.java
+++ /dev/null
@@ -1,807 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.crunch.util.Tuples;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * A special {@link DoFn} implementation that converts an {@link Iterable}
- * of values into a single value. If a {@code CombineFn} instance is used
- * on a {@link PGroupedTable}, the function will be applied to the output
- * of the map stage before the data is passed to the reducer, which can
- * improve the runtime of certain classes of jobs.
- *
- */
-public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
-  
-  public static interface Aggregator<T> extends Serializable {
-    /**
-     * Clears the internal state of this Aggregator and prepares it for the values associated
-     * with the next key.
-     */
-    void reset();
-
-    /**
-     * Incorporate the given value into the aggregate state maintained by this instance.
-     */
-    void update(T value);
-    
-    /**
-     * Returns the current aggregated state of this instance.
-     */
-    Iterable<T> results();    
-  }
-  
-  /**
-   * Interface for constructing new aggregator instances.
-   */
-  public static interface AggregatorFactory<T> {
-    Aggregator<T> create();
-  }
-  
-  /**
-   * A {@code CombineFn} that delegates all of the actual work to an {@code Aggregator}
-   * instance.
-   */
-  public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
-    
-    private final Aggregator<V> aggregator;
-    
-    public AggregatorCombineFn(Aggregator<V> aggregator) {
-      this.aggregator = aggregator;
-    }
-    
-    @Override
-    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
-      aggregator.reset();
-      for (V v : input.second()) {
-        aggregator.update(v);
-      }
-      for (V v : aggregator.results()) {
-        emitter.emit(Pair.of(input.first(), v));
-      }
-    }    
-  }
-  
-  private static abstract class TupleAggregator<T> implements Aggregator<T> {   
-    private final List<Aggregator<Object>> aggregators;
-    
-    public TupleAggregator(Aggregator<?>...aggregators) {
-      this.aggregators = Lists.newArrayList();
-      for (Aggregator<?> a : aggregators) {
-        this.aggregators.add((Aggregator<Object>) a);
-      }
-    }
-    
-    @Override
-    public void reset() {
-      for (Aggregator<?> a : aggregators) {
-        a.reset();
-      }
-    }
-    
-    protected void updateTuple(Tuple t) {
-      for (int i = 0; i < aggregators.size(); i++) {
-        aggregators.get(i).update(t.get(i));
-      }
-    }
-    
-    protected Iterable<Object> results(int index) {
-      return aggregators.get(index).results();
-    }
-  }
-  
-  public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
-    
-    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
-      super(a1, a2);
-    }
-    
-    @Override
-    public void update(Pair<V1, V2> value) {
-      updateTuple(value);
-    }
-    
-    @Override
-    public Iterable<Pair<V1, V2>> results() {
-      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
-    }
-  }
-  
-  public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
-    
-    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
-      super(a1, a2, a3);
-    }
-    
-    @Override
-    public void update(Tuple3<A, B, C> value) {
-      updateTuple(value);
-    }
-    
-    @Override
-    public Iterable<Tuple3<A, B, C>> results() {
-      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0),
-          (Iterable<B>) results(1), (Iterable<C>) results(2));
-    }
-  }
-
-  public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
-    
-    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
-      super(a1, a2, a3, a4);
-    }
-    
-    @Override
-    public void update(Tuple4<A, B, C, D> value) {
-      updateTuple(value);
-    }
-    
-    @Override
-    public Iterable<Tuple4<A, B, C, D>> results() {
-      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0),
-          (Iterable<B>) results(1), (Iterable<C>) results(2), (Iterable<D>) results(3));
-    }
-  }
-  
-  public static class TupleNAggregator extends TupleAggregator<TupleN> {
-    
-    private final int size;
-    
-    public TupleNAggregator(Aggregator<?>... aggregators) {
-      super(aggregators);
-      size = aggregators.length;
-    }
-    
-    @Override
-    public void update(TupleN value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<TupleN> results() {
-      Iterable<?>[] iterables = new Iterable[size];
-      for (int i = 0; i < size; i++) {
-        iterables[i] = results(i);
-      }
-      return new Tuples.TupleNIterable(iterables);
-    }
-    
-  }
-  
-  public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator);
-  }
-  
-  public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator.create());
-  }
-  
-  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(
-      AggregatorFactory<V1> a1, AggregatorFactory<V2> a2) {
-    return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
-  }
-  
-  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(
-      AggregatorFactory<A> a1, AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
-    return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
-  }
-
-  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(
-      AggregatorFactory<A> a1, AggregatorFactory<B> a2, AggregatorFactory<C> a3,
-      AggregatorFactory<D> a4) {
-    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(),
-        a4.create()));
-  }
-
-  public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
-    Aggregator<?>[] aggs = new Aggregator[factories.length];
-    for (int i = 0; i < aggs.length; i++) {
-      aggs[i] = factories[i].create();
-    }
-    return aggregator(new TupleNAggregator(aggs));
-  }
-  
-  public static final <K> CombineFn<K, Long> SUM_LONGS() {
-    return aggregatorFactory(SUM_LONGS);
-  }
-
-  public static final <K> CombineFn<K, Integer> SUM_INTS() {
-    return aggregatorFactory(SUM_INTS);
-  }
-
-  public static final <K> CombineFn<K, Float> SUM_FLOATS() {
-    return aggregatorFactory(SUM_FLOATS);
-  }
-
-  public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
-    return aggregatorFactory(SUM_DOUBLES);
-  }
- 
-  public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
-    return aggregatorFactory(SUM_BIGINTS);
-  }
-  
-  public static final <K> CombineFn<K, Long> MAX_LONGS() {
-    return aggregatorFactory(MAX_LONGS);
-  }
-
-  public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
-    return aggregator(new MaxNAggregator<Long>(n));
-  }
-  
-  public static final <K> CombineFn<K, Integer> MAX_INTS() {
-    return aggregatorFactory(MAX_INTS);
-  }
-
-  public static final <K> CombineFn<K, Integer> MAX_INTS(int n) {
-    return aggregator(new MaxNAggregator<Integer>(n));
-  }
-
-  public static final <K> CombineFn<K, Float> MAX_FLOATS() {
-    return aggregatorFactory(MAX_FLOATS);
-  }
-
-  public static final <K> CombineFn<K, Float> MAX_FLOATS(int n) {
-    return aggregator(new MaxNAggregator<Float>(n));
-  }
-
-  public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
-    return aggregatorFactory(MAX_DOUBLES);
-  }
-
-  public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
-    return aggregator(new MaxNAggregator<Double>(n));
-  }
-  
-  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
-    return aggregatorFactory(MAX_BIGINTS);
-  }
-  
-  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
-    return aggregator(new MaxNAggregator<BigInteger>(n));
-  }
-  
-  public static final <K> CombineFn<K, Long> MIN_LONGS() {
-    return aggregatorFactory(MIN_LONGS);
-  }
-
-  public static final <K> CombineFn<K, Long> MIN_LONGS(int n) {
-    return aggregator(new MinNAggregator<Long>(n));
-  }
-
-  public static final <K> CombineFn<K, Integer> MIN_INTS() {
-    return aggregatorFactory(MIN_INTS);
-  }
-
-  public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
-    return aggregator(new MinNAggregator<Integer>(n));
-  }
-  
-  public static final <K> CombineFn<K, Float> MIN_FLOATS() {
-    return aggregatorFactory(MIN_FLOATS);
-  }
-
-  public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
-    return aggregator(new MinNAggregator<Float>(n));
-  }
-  
-  public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
-    return aggregatorFactory(MIN_DOUBLES);
-  }
-
-  public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
-    return aggregator(new MinNAggregator<Double>(n));
-  }
-  
-  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
-    return aggregatorFactory(MIN_BIGINTS);
-  }
-  
-  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
-    return aggregator(new MinNAggregator<BigInteger>(n));
-  }
-  
-  public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
-    return aggregator(new FirstNAggregator<V>(n));
-  }
-
-  public static final <K, V> CombineFn<K, V> LAST_N(int n) {
-    return aggregator(new LastNAggregator<V>(n));
-  }
-  
-  public static class SumLongs implements Aggregator<Long> {    
-    private long sum = 0;
-    
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Long next) {
-      sum += next;
-    }
-    
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-  public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() { return new SumLongs(); }
-  };
-  
-  public static class SumInts implements Aggregator<Integer> {
-    private int sum = 0;
-    
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Integer next) {
-      sum += next;
-    }
-    
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-  public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() { return new SumInts(); }
-  };
-  
-  public static class SumFloats implements Aggregator<Float> {    
-    private float sum = 0;
-    
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Float next) {
-      sum += next;
-    }
-    
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-  public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() { return new SumFloats(); }
-  };
-  
-  public static class SumDoubles implements Aggregator<Double> {    
-    private double sum = 0;
-    
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Double next) {
-      sum += next;
-    }
-    
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-  public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() { return new SumDoubles(); }
-  };
-  
-  public static class SumBigInts implements Aggregator<BigInteger> {    
-    private BigInteger sum = BigInteger.ZERO;
-    
-    @Override
-    public void reset() {
-      sum = BigInteger.ZERO;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      sum = sum.add(next);
-    }
-    
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-  public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() { return new SumBigInts(); }
-  };
-  
-  public static class MaxLongs implements Aggregator<Long> {
-    private Long max = null;
-    
-    @Override
-    public void reset() {
-      max = null;
-    }
-    
-    @Override
-    public void update(Long next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(max);
-    }
-  }
-  public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() { return new MaxLongs(); }
-  };
-  
-  public static class MaxInts implements Aggregator<Integer> {
-    private Integer max = null;
-    
-    @Override
-    public void reset() {
-      max = null;
-    }
-    
-    @Override
-    public void update(Integer next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(max);
-    }
-  }
-  public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() { return new MaxInts(); }
-  };
-  
-  public static class MaxFloats implements Aggregator<Float> {
-    private Float max = null;
-    
-    @Override
-    public void reset() {
-      max = null;
-    }
-    
-    @Override
-    public void update(Float next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(max);
-    }
-  }
-  public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() { return new MaxFloats(); }
-  };
-  
-  public static class MaxDoubles implements Aggregator<Double> {
-    private Double max = null;
-    
-    @Override
-    public void reset() {
-      max = null;
-    }
-    
-    @Override
-    public void update(Double next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(max);
-    }
-  }
-  public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() { return new MaxDoubles(); }
-  };
-  
-  public static class MaxBigInts implements Aggregator<BigInteger> {
-    private BigInteger max = null;
-    
-    @Override
-    public void reset() {
-      max = null;
-    }
-    
-    @Override
-    public void update(BigInteger next) {
-      if (max == null || max.compareTo(next) < 0) {
-        max = next;
-      }
-    }
-    
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(max);
-    }
-  }
-  public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() { return new MaxBigInts(); }
-  };
-  
-  public static class MinLongs implements Aggregator<Long> {
-    private Long min = null;
-    
-    @Override
-    public void reset() {
-      min = null;
-    }
-    
-    @Override
-    public void update(Long next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(min);
-    }
-  }
-  public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() { return new MinLongs(); }
-  };
-  
-  public static class MinInts implements Aggregator<Integer> {    
-    private Integer min = null;
-    
-    @Override
-    public void reset() {
-      min = null;
-    }
-    
-    @Override
-    public void update(Integer next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(min);
-    }
-  }
-  public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() { return new MinInts(); }
-  };
-  
-  public static class MinFloats implements Aggregator<Float> {
-    private Float min = null;
-    
-    @Override
-    public void reset() {
-      min = null;
-    }
-    
-    @Override
-    public void update(Float next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(min);
-    }
-  }
-  public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() { return new MinFloats(); }
-  };
-  
-  public static class MinDoubles implements Aggregator<Double> {
-    private Double min = null;
-    
-    @Override
-    public void reset() {
-      min = null;
-    }
-    
-    @Override
-    public void update(Double next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-    
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(min);
-    }
-  }
-  public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() { return new MinDoubles(); }
-  };
-
-  public static class MinBigInts implements Aggregator<BigInteger> {
-    private BigInteger min = null;
-    
-    @Override
-    public void reset() {
-      min = null;
-    }
-    
-    @Override
-    public void update(BigInteger next) {
-      if (min == null || min.compareTo(next) > 0) {
-        min = next;
-      }
-    }
-    
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(min);
-    }
-  }
-  public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() { return new MinBigInts(); }
-  };
-  
-  public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MaxNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-    
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.first()) > 0) {
-        elements.remove(elements.first());
-        elements.add(value);
-      }
-    }
-    
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-  
-  public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-    
-    public MinNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-    
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.last()) < 0) {
-        elements.remove(elements.last());
-        elements.add(value);
-      }
-    }
-    
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-  
-  public static class FirstNAggregator<V> implements Aggregator<V> {
-    private final int arity;
-    private final List<V> elements;
-    
-    public FirstNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newArrayList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-    
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      }
-    }
-    
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  public static class LastNAggregator<V> implements Aggregator<V> {
-    private final int arity;
-    private final LinkedList<V> elements;
-    
-    public LastNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newLinkedList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-    
-    @Override
-    public void update(V value) {
-      elements.add(value);
-      if (elements.size() == arity + 1) {
-        elements.removeFirst();
-      }
-    }
-    
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/DoFn.java b/src/main/java/org/apache/crunch/DoFn.java
deleted file mode 100644
index 30fa70c..0000000
--- a/src/main/java/org/apache/crunch/DoFn.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import org.apache.crunch.test.TestCounters;
-
-/**
- * Base class for all data processing functions in Crunch.
- * 
- * <p>Note that all {@code DoFn} instances implement {@link Serializable},
- * and thus all of their non-transient member variables must implement
- * {@code Serializable} as well. If your DoFn depends on non-serializable
- * classes for data processing, they may be declared as {@code transient}
- * and initialized in the DoFn's {@code initialize} method.
- *
- */
-public abstract class DoFn<S, T> implements Serializable {
-  private transient TaskInputOutputContext<?, ?, ?, ?> context;
-  private transient Configuration testConf;
-  private transient String internalStatus;
-  
-  /**
-   * Called during the job planning phase. Subclasses may override
-   * this method in order to modify the configuration of the Job
-   * that this DoFn instance belongs to.
-   * 
-   * @param conf The Configuration instance for the Job.
-   */
-  public void configure(Configuration conf) {  
-  }
-  
-  /**
-   * Processes the records from a {@link PCollection}.
-   * 
-   * <br/>
-   * <br/>
-   * <b>Note:</b> Crunch can reuse a single input record object whose content
-   * changes on each {@link #process(Object, Emitter)} method call. This
-   * functionality is imposed by Hadoop's <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html">Reducer</a> implementation: 
-   * <i>The framework will reuse the key and value objects that are passed into the reduce, therefore the application
-   * should clone the objects they want to keep a copy of.</i>
-   * 
-   * @param input
-   *          The input record.
-   * @param emitter
-   *          The emitter to send the output to
-   */
-  public abstract void process(S input, Emitter<T> emitter);
-
-  /**
-   * Called during the setup of the MapReduce job this {@code DoFn}
-   * is associated with. Subclasses may override this method to
-   * do appropriate initialization.
-   */
-  public void initialize() {
-  }
-
-  /**
-   * Called during the cleanup of the MapReduce job this {@code DoFn}
-   * is associated with. Subclasses may override this method to do
-   * appropriate cleanup.
-   * 
-   * @param emitter The emitter that was used for output
-   */
-  public void cleanup(Emitter<T> emitter) {
-  }
-
-  /**
-   * Called during setup to pass the {@link TaskInputOutputContext} to
-   * this {@code DoFn} instance.
-   */
-  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-    this.context = context;
-    initialize();
-  }
-
-  /**
-   * Sets a {@code Configuration} instance to be used during unit tests.
-   * @param conf The Configuration instance.
-   */
-  public void setConfigurationForTest(Configuration conf) {
-    this.testConf = conf;
-  }
-  
-  /**
-   * Returns an estimate of how applying this function to a {@link PCollection}
-   * will cause it to change in side. The optimizer uses these estimates to
-   * decide where to break up dependent MR jobs into separate Map and Reduce
-   * phases in order to minimize I/O.
-   * 
-   * <p>
-   * Subclasses of {@code DoFn} that will substantially alter the size of the
-   * resulting {@code PCollection} should override this method.
-   */
-  public float scaleFactor() {
-    return 1.2f;
-  }
-  
-  protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
-    return context;
-  }
-  
-  protected Configuration getConfiguration() {
-    if (context != null) {
-      return context.getConfiguration();
-    } else if (testConf != null) {
-      return testConf;
-    }
-    return null;
-  }
-  
-  protected Counter getCounter(Enum<?> counterName) {
-    if (context == null) {
-      return TestCounters.getCounter(counterName);
-    }
-    return context.getCounter(counterName);
-  }
-  
-  protected Counter getCounter(String groupName, String counterName) {
-    if (context == null) {
-      return TestCounters.getCounter(groupName, counterName);
-    }
-    return context.getCounter(groupName, counterName);
-  }
-  
-  protected void increment(Enum<?> counterName) {
-    increment(counterName, 1);
-  }
-  
-  protected void increment(Enum<?> counterName, long value) {
-    getCounter(counterName).increment(value);
-  }
-  
-  protected void progress() {
-    if (context != null) {
-      context.progress();
-    }
-  }
-  
-  protected TaskAttemptID getTaskAttemptID() {
-    if (context != null) {
-      return context.getTaskAttemptID();
-    } else {
-      return new TaskAttemptID();
-    }
-  }
-  
-  protected void setStatus(String status) {
-    if (context != null) {
-      context.setStatus(status);
-    }
-    this.internalStatus = status;
-  }
-  
-  protected String getStatus() {
-    if (context != null) {
-      return context.getStatus();
-    }
-    return internalStatus;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Emitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Emitter.java b/src/main/java/org/apache/crunch/Emitter.java
deleted file mode 100644
index b9d4ff2..0000000
--- a/src/main/java/org/apache/crunch/Emitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * Interface for writing outputs from a {@link DoFn}.
- *
- */
-public interface Emitter<T> {
-  /**
-   * Write the emitted value to the next stage of the pipeline.
-   * 
-   * @param emitted The value to write
-   */
-  void emit(T emitted);
-
-  /**
-   * Flushes any values cached by this emitter. Called during the
-   * cleanup stage.
-   */
-  void flush();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/FilterFn.java b/src/main/java/org/apache/crunch/FilterFn.java
deleted file mode 100644
index d471710..0000000
--- a/src/main/java/org/apache/crunch/FilterFn.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * A {@link DoFn} for the common case of filtering the members of
- * a {@link PCollection} based on a boolean condition.
- *
- */
-public abstract class FilterFn<T> extends DoFn<T, T> {
-
-  /**
-   * If true, emit the given record.
-   */
-  public abstract boolean accept(T input);
-
-  @Override
-  public void process(T input, Emitter<T> emitter) {
-    if (accept(input)) {
-      emitter.emit(input);
-    }
-  }
-
-  @Override
-  public float scaleFactor() {
-    return 0.5f;
-  }
-  
-  public static <S> FilterFn<S> and(FilterFn<S>...fns) {
-    return new AndFn<S>(fns);
-  }
-  
-  public static class AndFn<S> extends FilterFn<S> {
-    
-    private final List<FilterFn<S>> fns;
-    
-    public AndFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
-    }
-    
-    @Override
-    public boolean accept(S input) {
-      for (FilterFn<S> fn : fns) {
-        if (!fn.accept(input)) {
-          return false;
-        }
-      }
-      return true;
-    }
-    
-    @Override
-    public float scaleFactor() {
-      float scaleFactor = 1.0f;
-      for (FilterFn<S> fn : fns) {
-        scaleFactor *= fn.scaleFactor();
-      }
-      return scaleFactor;
-    }    
-  }
-  
-  public static <S> FilterFn<S> or(FilterFn<S>...fns) {
-    return new OrFn<S>(fns);
-  }
-  
-  public static class OrFn<S> extends FilterFn<S> {
-    
-    private final List<FilterFn<S>> fns;
-    
-    public OrFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
-    }
-    
-    @Override
-    public boolean accept(S input) {
-      for (FilterFn<S> fn : fns) {
-        if (fn.accept(input)) {
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    @Override
-    public float scaleFactor() {
-      float scaleFactor = 0.0f;
-      for (FilterFn<S> fn : fns) {
-        scaleFactor += fn.scaleFactor();
-      }
-      return Math.min(1.0f, scaleFactor);
-    }    
-  }
-  
-  public static <S> FilterFn<S> not(FilterFn<S> fn) {
-    return new NotFn<S>(fn);
-  }
-  
-  public static class NotFn<S> extends FilterFn<S> {
-    
-    private final FilterFn<S> base;
-    
-    public NotFn(FilterFn<S> base) {
-      this.base = base;
-    }
-    
-    @Override
-    public boolean accept(S input) {
-      return !base.accept(input);
-    }
-    
-    @Override
-    public float scaleFactor() {
-      return 1.0f - base.scaleFactor();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/GroupingOptions.java b/src/main/java/org/apache/crunch/GroupingOptions.java
deleted file mode 100644
index d6cd12e..0000000
--- a/src/main/java/org/apache/crunch/GroupingOptions.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * Options that can be passed to a {@code groupByKey} operation in order to exercise
- * finer control over how the partitioning, grouping, and sorting of keys is
- * performed.
- *
- */
-public class GroupingOptions {
-
-  private final Class<? extends Partitioner> partitionerClass;
-  private final Class<? extends RawComparator> groupingComparatorClass;
-  private final Class<? extends RawComparator> sortComparatorClass;
-  private final int numReducers;
-
-  private GroupingOptions(Class<? extends Partitioner> partitionerClass,
-      Class<? extends RawComparator> groupingComparatorClass,
-      Class<? extends RawComparator> sortComparatorClass, int numReducers) {
-    this.partitionerClass = partitionerClass;
-    this.groupingComparatorClass = groupingComparatorClass;
-    this.sortComparatorClass = sortComparatorClass;
-    this.numReducers = numReducers;
-  }
-
-  public int getNumReducers() {
-    return numReducers;
-  }
-  
-  public Class<? extends RawComparator> getSortComparatorClass() {
-    return sortComparatorClass;
-  }
-  
-  public void configure(Job job) {
-    if (partitionerClass != null) {
-      job.setPartitionerClass(partitionerClass);
-    }
-    if (groupingComparatorClass != null) {
-      job.setGroupingComparatorClass(groupingComparatorClass);
-    }
-    if (sortComparatorClass != null) {
-      job.setSortComparatorClass(sortComparatorClass);
-    }
-    if (numReducers > 0) {
-      job.setNumReduceTasks(numReducers);
-    }
-  }
-
-  public boolean isCompatibleWith(GroupingOptions other) {
-    if (partitionerClass != other.partitionerClass) {
-      return false;
-    }
-    if (groupingComparatorClass != other.groupingComparatorClass) {
-      return false;
-    }
-    if (sortComparatorClass != other.sortComparatorClass) {
-      return false;
-    }
-    return true;
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  /**
-   * Builder class for creating {@code GroupingOptions} instances.
-   *
-   */
-  public static class Builder {
-    private Class<? extends Partitioner> partitionerClass;
-    private Class<? extends RawComparator> groupingComparatorClass;
-    private Class<? extends RawComparator> sortComparatorClass;
-    private int numReducers;
-
-    public Builder() {
-    }
-
-    public Builder partitionerClass(
-        Class<? extends Partitioner> partitionerClass) {
-      this.partitionerClass = partitionerClass;
-      return this;
-    }
-
-    public Builder groupingComparatorClass(
-        Class<? extends RawComparator> groupingComparatorClass) {
-      this.groupingComparatorClass = groupingComparatorClass;
-      return this;
-    }
-
-    public Builder sortComparatorClass(
-        Class<? extends RawComparator> sortComparatorClass) {
-      this.sortComparatorClass = sortComparatorClass;
-      return this;
-    }
-
-    public Builder numReducers(int numReducers) {
-      if (numReducers <= 0) {
-        throw new IllegalArgumentException("Invalid number of reducers: " + numReducers);
-      }
-      this.numReducers = numReducers;
-      return this;
-    }
-
-    public GroupingOptions build() {
-      return new GroupingOptions(partitionerClass, groupingComparatorClass,
-          sortComparatorClass, numReducers);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/MapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/MapFn.java b/src/main/java/org/apache/crunch/MapFn.java
deleted file mode 100644
index b1c5520..0000000
--- a/src/main/java/org/apache/crunch/MapFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A {@link DoFn} for the common case of emitting exactly one value
- * for each input record.
- *
- */
-public abstract class MapFn<S, T> extends DoFn<S, T> {
-  
-  /**
-   * Maps the given input into an instance of the output type.
-   */
-  public abstract T map(S input);
-
-  @Override
-  public void process(S input, Emitter<T> emitter) {
-    emitter.emit(map(input));
-  }
-
-  @Override
-  public float scaleFactor() {
-    return 1.0f;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/PCollection.java b/src/main/java/org/apache/crunch/PCollection.java
deleted file mode 100644
index cd25bdd..0000000
--- a/src/main/java/org/apache/crunch/PCollection.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-
-/**
- * A representation of an immutable, distributed collection of elements
- * that is the fundamental target of computations in Crunch.
- *
- */
-public interface PCollection<S> {
-  /**
-   * Returns the {@code Pipeline} associated with this PCollection.
-   */
-  Pipeline getPipeline();
-  
-  /**
-   * Returns a {@code PCollection} instance that acts as the union
-   * of this {@code PCollection} and the input {@code PCollection}s.
-   */
-  PCollection<S> union(PCollection<S>... collections);
-
-  /**
-   * Applies the given doFn to the elements of this {@code PCollection} and
-   * returns a new {@code PCollection} that is the output of this processing.
-   * 
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PType} of the resulting {@code PCollection}
-   * @return a new {@code PCollection}
-   */
-  <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);
-  
-  /**
-   * Applies the given doFn to the elements of this {@code PCollection} and
-   * returns a new {@code PCollection} that is the output of this processing.
-   * 
-   * @param name An identifier for this processing step, useful for debugging
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PType} of the resulting {@code PCollection}
-   * @return a new {@code PCollection}
-   */
-  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
-
-  /**
-   * Similar to the other {@code parallelDo} instance, but returns a
-   * {@code PTable} instance instead of a {@code PCollection}.
-   * 
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PTableType} of the resulting {@code PTable}
-   * @return a new {@code PTable}
-   */
-  <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
-  
-  /**
-   * Similar to the other {@code parallelDo} instance, but returns a
-   * {@code PTable} instance instead of a {@code PCollection}.
-   * 
-   * @param name An identifier for this processing step
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PTableType} of the resulting {@code PTable}
-   * @return a new {@code PTable}
-   */
-  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
-      PTableType<K, V> type);
-
-  /**
-   * Write the contents of this {@code PCollection} to the given {@code Target},
-   * using the storage format specified by the target.
-   * 
-   * @param target The target to write to
-   */
-  PCollection<S> write(Target target);
-  
-  /**
-   * Returns a reference to the data set represented by this PCollection that
-   * may be used by the client to read the data locally.
-   */
-  Iterable<S> materialize();
-  
-  /**
-   * Returns the {@code PType} of this {@code PCollection}.
-   */
-  PType<S> getPType();
-
-  /**
-   * Returns the {@code PTypeFamily} of this {@code PCollection}.
-   */
-  PTypeFamily getTypeFamily();
-
-  /**
-   * Returns the size of the data represented by this {@code PCollection} in bytes.
-   */
-  long getSize();
-
-  /**
-   * Returns a shorthand name for this PCollection.
-   */
-  String getName();
-  
-  /**
-   * Apply the given filter function to this instance and return the
-   * resulting {@code PCollection}.
-   */
-  PCollection<S> filter(FilterFn<S> filterFn);
-  
-  /**
-   * Apply the given filter function to this instance and return the
-   * resulting {@code PCollection}.
-   * 
-   * @param name An identifier for this processing step
-   * @param filterFn The {@code FilterFn} to apply
-   */
-  PCollection<S> filter(String name, FilterFn<S> filterFn);
-  
-  /**
-   * Apply the given map function to each element of this instance in order
-   * to create a {@code PTable}.
-   */
-  <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
- 
-  /**
-   * Apply the given map function to each element of this instance in order
-   * to create a {@code PTable}.
-   *   
-   * @param name An identifier for this processing step
-   * @param extractKeyFn The {@code MapFn} to apply
-   */
-  <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType);
-  
-  /**
-   * Returns a {@code PCollection} instance that contains all of the elements
-   * of this instance in sorted order.
-   */
-  PCollection<S> sort(boolean ascending);
-  
-  /**
-   * Returns a {@code PTable} instance that contains the counts of each unique
-   * element of this PCollection.
-   */
-  PTable<S, Long> count();
-  
-  /**
-   * Returns a {@code PCollection} made up of only the maximum element of this
-   * instance.
-   */
-  PCollection<S> max();
-  
-  /**
-   * Returns a {@code PCollection} made up of only the minimum element of this
-   * instance.
-   */
-  PCollection<S> min();
-  
-  /**
-   * Randomly sample items from this PCollection instance with the given
-   * probability of an item being accepted.
-   */
-  PCollection<S> sample(double acceptanceProbability);
-  
-  /**
-   * Randomly sample items from this PCollection instance with the given
-   * probability of an item being accepted and using the given seed.
-   */
-  PCollection<S> sample(double acceptanceProbability, long seed);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/PGroupedTable.java b/src/main/java/org/apache/crunch/PGroupedTable.java
deleted file mode 100644
index 96918a5..0000000
--- a/src/main/java/org/apache/crunch/PGroupedTable.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * The Crunch representation of a grouped {@link PTable}.
- *
- */
-public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
-  /**
-   * Combines the values of this grouping using the given {@code CombineFn}.
-   * 
-   * @param combineFn The combiner function
-   * @return A {@code PTable} where each key has a single value
-   */
-  PTable<K, V> combineValues(CombineFn<K, V> combineFn);
-
-  /**
-   * Convert this grouping back into a multimap.
-   * 
-   * @return an ungrouped version of the data in this {@code PGroupedTable}.
-   */
-  PTable<K, V> ungroup();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/PTable.java b/src/main/java/org/apache/crunch/PTable.java
deleted file mode 100644
index b673f6d..0000000
--- a/src/main/java/org/apache/crunch/PTable.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-/**
- * A sub-interface of {@code PCollection} that represents an immutable,
- * distributed multi-map of keys and values.
- *
- */
-public interface PTable<K, V> extends PCollection<Pair<K, V>> {
-  /**
-   * Returns a {@code PTable} instance that acts as the union
-   * of this {@code PTable} and the input {@code PTable}s.
-   */
-  PTable<K, V> union(PTable<K, V>... others);
-
-  /**
-   * Performs a grouping operation on the keys of this table.
-   * @return a {@code PGroupedTable} instance that represents the grouping
-   */
-  PGroupedTable<K, V> groupByKey();
-
-  /**
-   * Performs a grouping operation on the keys of this table, using the given
-   * number of partitions.
-   * 
-   * @param numPartitions The number of partitions for the data.
-   * @return a {@code PGroupedTable} instance that represents this grouping
-   */
-  PGroupedTable<K, V> groupByKey(int numPartitions);
-  
-  /**
-   * Performs a grouping operation on the keys of this table, using the
-   * additional {@code GroupingOptions} to control how the grouping is
-   * executed.
-   * 
-   * @param options The grouping options to use
-   * @return a {@code PGroupedTable} instance that represents the grouping
-   */
-  PGroupedTable<K, V> groupByKey(GroupingOptions options);
-
-  /**
-   * Writes this {@code PTable} to the given {@code Target}.
-   */
-  PTable<K, V> write(Target target);
-  
-  /**
-   * Returns the {@code PTableType} of this {@code PTable}.
-   */
-  PTableType<K, V> getPTableType();
-  
-  /**
-   * Returns the {@code PType} of the key.
-   */
-  PType<K> getKeyType();
-
-  /**
-   * Returns the {@code PType} of the value.
-   */
-  PType<V> getValueType();
-  
-  /**
-   * Aggregate all of the values with the same key into a single
-   * key-value pair in the returned PTable.
-   */
-  PTable<K, Collection<V>> collectValues();
-  
-  /**
-   * Returns a PTable made up of the pairs in this PTable with the
-   * largest value field.
-   * @param count The number of pairs to return
-   */
-  PTable<K, V> top(int count);
-  
-  /**
-   * Returns a PTable made up of the pairs in this PTable with the
-   * smallest value field.
-   * @param count The number of pairs to return
-   */
-  PTable<K, V> bottom(int count);
-  
-  /**
-   * Perform an inner join on this table and the one passed in as
-   * an argument on their common keys.
-   */
-  <U> PTable<K, Pair<V, U>> join(PTable<K, U> other);
-  
-  /**
-   * Co-group operation with the given table on common keys.
-   */
-  <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other);
-
-  /**
-   * Returns a {@link PCollection} made up of the keys in this PTable.
-   */  
-  PCollection<K> keys();
-  
-  /**
-   * Returns a {@link PCollection} made up of the values in this PTable.
-   */
-  PCollection<V> values();
-  
-  /**
-   * Returns a Map<K, V> made up of the keys and values in this PTable.
-   * <p>
-   * <b>Note:</b> The contents of the returned map may not be exactly the same
-   * as this PTable, as a PTable is a multi-map (i.e. can contain multiple
-   * values for a single key).
-   */
-  Map<K, V> materializeToMap();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Pair.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Pair.java b/src/main/java/org/apache/crunch/Pair.java
deleted file mode 100644
index 9d319ab..0000000
--- a/src/main/java/org/apache/crunch/Pair.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for two-element {@link Tuple}s.
- */
-public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
-
-  private final K first;
-  private final V second;
-
-  public static <T, U> Pair<T, U> of(T first, U second) {
-    return new Pair<T, U>(first, second);
-  }
-
-  public Pair(K first, V second) {
-    this.first = first;
-    this.second = second;
-  }
-
-  public K first() {
-    return first;
-  }
-
-  public V second() {
-    return second;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 2;
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Pair<?, ?> other = (Pair<?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second)));
-  }
-
-  @Override
-  public String toString() {
-	StringBuilder sb = new StringBuilder("[");
-	sb.append(first).append(",").append(second).append("]");
-	return sb.toString();
-  }
-
-  private int cmp(Object lhs, Object rhs) {
-    if (lhs == rhs) {
-      return 0;
-    } else if (lhs != null && Comparable.class.isAssignableFrom(lhs.getClass())) {
-      return ((Comparable) lhs).compareTo(rhs);
-    }
-    return (lhs == null ? 0 : lhs.hashCode()) - (rhs == null ? 0 : rhs.hashCode());
-  }
-  
-  @Override  
-  public int compareTo(Pair<K, V> o) {
-    int diff = cmp(first, o.first);
-    if (diff == 0) {
-      diff = cmp(second, o.second);
-    }
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Pipeline.java b/src/main/java/org/apache/crunch/Pipeline.java
deleted file mode 100644
index fd60695..0000000
--- a/src/main/java/org/apache/crunch/Pipeline.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Manages the state of a pipeline execution.
- * 
- */
-public interface Pipeline {
-  
-  /**
-   * Set the {@code Configuration} to use with this pipeline.
-   */
-  void setConfiguration(Configuration conf);
-  
-  /**
-   * Returns the name of this pipeline.
-   * @return Name of the pipeline
-   */
-  String getName();
-  
-  /**
-   * Returns the {@code Configuration} instance associated with this pipeline.
-   */
-  Configuration getConfiguration();
-  
-  /**
-   * Converts the given {@code Source} into a {@code PCollection} that is
-   * available to jobs run using this {@code Pipeline} instance.
-   * 
-   * @param source The source of data
-   * @return A PCollection that references the given source
-   */
-  <T> PCollection<T> read(Source<T> source);
-
-  /**
-   * A version of the read method for {@code TableSource} instances that
-   * map to {@code PTable}s.
-   * @param tableSource The source of the data
-   * @return A PTable that references the given source
-   */
-  <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
-  
-  /**
-   * Write the given collection to the given target on the next
-   * pipeline run.
-   * 
-   * @param collection The collection
-   * @param target The output target
-   */
-  void write(PCollection<?> collection, Target target);
-
-  /**
-   * Create the given PCollection and read the data it contains
-   * into the returned Collection instance for client use.
-   *
-   * @param pcollection The PCollection to materialize
-   * @return the data from the PCollection as a read-only Collection
-   */
-  <T> Iterable<T> materialize(PCollection<T> pcollection);
-  
-  /**
-   * Constructs and executes a series of MapReduce jobs in order
-   * to write data to the output targets.
-   */
-  PipelineResult run();
-
-  /**
-   * Run any remaining jobs required to generate outputs and then
-   * clean up any intermediate data files that were created in
-   * this run or previous calls to {@code run}.
-   */
-  PipelineResult done();
-
-  /**
-   * A convenience method for reading a text file.
-   */
-  PCollection<String> readTextFile(String pathName);
-
-  /**
-   * A convenience method for writing a text file.
-   */
-  <T> void writeTextFile(PCollection<T> collection, String pathName);
-  
-  /**
-   * Turn on debug logging for jobs that are run from this pipeline.
-   */
-  void enableDebug();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/PipelineResult.java b/src/main/java/org/apache/crunch/PipelineResult.java
deleted file mode 100644
index 6f97fe8..0000000
--- a/src/main/java/org/apache/crunch/PipelineResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Container for the results of a call to {@code run} or {@code done} on the Pipeline interface that includes
- * details and statistics about the component stages of the data pipeline.
- */
-public class PipelineResult {
-
-  public static class StageResult {
-    
-    private final String stageName;
-    private final Counters counters;
-    
-    public StageResult(String stageName, Counters counters) {
-      this.stageName = stageName;
-      this.counters = counters;
-    }
-    
-    public String getStageName() {
-      return stageName;
-    }
-    
-    public Counters getCounters() {
-      return counters;
-    }
-    
-    public Counter findCounter(Enum<?> key) {
-      return counters.findCounter(key);
-    }
-    
-    public long getCounterValue(Enum<?> key) {
-      return findCounter(key).getValue();
-    }
-  }
-  
-  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult>of());
-  
-  private final List<StageResult> stageResults;
-  
-  public PipelineResult(List<StageResult> stageResults) {
-    this.stageResults = ImmutableList.copyOf(stageResults);
-  }
-  
-  public boolean succeeded() {
-    return !stageResults.isEmpty();
-  }
-  
-  public List<StageResult> getStageResults() {
-    return stageResults;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Source.java b/src/main/java/org/apache/crunch/Source.java
deleted file mode 100644
index a77a656..0000000
--- a/src/main/java/org/apache/crunch/Source.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-import org.apache.crunch.types.PType;
-
-/**
- * A {@code Source} represents an input data set that is an input to one
- * or more MapReduce jobs.
- *
- */
-public interface Source<T> { 
-  /**
-   * Returns the {@code PType} for this source.
-   */
-  PType<T> getType();
-
-  /**
-   * Configure the given job to use this source as an input.
-   * 
-   * @param job The job to configure
-   * @param inputId For a multi-input job, an identifier for this input to the job
-   * @throws IOException
-   */
-  void configureSource(Job job, int inputId) throws IOException;
-
-  /**
-   * Returns the number of bytes in this {@code Source}.
-   */
-  long getSize(Configuration configuration);  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/SourceTarget.java b/src/main/java/org/apache/crunch/SourceTarget.java
deleted file mode 100644
index 3d06392..0000000
--- a/src/main/java/org/apache/crunch/SourceTarget.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-
-/**
- * An interface for classes that implement both the {@code Source} and
- * the {@code Target} interfaces.
- *
- */
-public interface SourceTarget<T> extends Source<T>, Target {
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/TableSource.java b/src/main/java/org/apache/crunch/TableSource.java
deleted file mode 100644
index 9ebade6..0000000
--- a/src/main/java/org/apache/crunch/TableSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.types.PTableType;
-
-/**
- * The interface {@code Source} implementations that return a {@link PTable}.
- *
- */
-public interface TableSource<K, V> extends Source<Pair<K, V>> {
-  PTableType<K, V> getTableType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Target.java b/src/main/java/org/apache/crunch/Target.java
deleted file mode 100644
index 75de874..0000000
--- a/src/main/java/org/apache/crunch/Target.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.types.PType;
-
-/**
- * A {@code Target} represents the output destination of a Crunch job.
- *
- */
-public interface Target {
-  boolean accept(OutputHandler handler, PType<?> ptype);
-  
-  <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Tuple.java b/src/main/java/org/apache/crunch/Tuple.java
deleted file mode 100644
index 38ce188..0000000
--- a/src/main/java/org/apache/crunch/Tuple.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-/**
- * A fixed-size collection of Objects, used in Crunch for representing
- * joins between {@code PCollection}s.
- *
- */
-public interface Tuple {
-
-  /**
-   * Returns the Object at the given index.
-   */
-  Object get(int index);
-
-  /**
-   * Returns the number of elements in this Tuple.
-   */
-  int size();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Tuple3.java b/src/main/java/org/apache/crunch/Tuple3.java
deleted file mode 100644
index d06f17c..0000000
--- a/src/main/java/org/apache/crunch/Tuple3.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for three-element {@link Tuple}s.
- */
-public class Tuple3<V1, V2, V3> implements Tuple {
-
-  private final V1 first;
-  private final V2 second;
-  private final V3 third;
-
-  public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
-    return new Tuple3<A, B, C>(a, b, c);
-  }
-  
-  public Tuple3(V1 first, V2 second, V3 third) {
-    this.first = first;
-    this.second = second;
-    this.third = third;
-  }
-
-  public V1 first() {
-    return first;
-  }
-
-  public V2 second() {
-    return second;
-  }
-
-  public V3 third() {
-    return third;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    case 2:
-      return third;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 3;
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second))) &&
-    	(third == other.third || (third != null && third.equals(other.third)));
-  }
-
-  @Override
-  public String toString() {
-	StringBuilder sb = new StringBuilder("Tuple3[");
-	sb.append(first).append(",").append(second).append(",").append(third);
-	return sb.append("]").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/Tuple4.java b/src/main/java/org/apache/crunch/Tuple4.java
deleted file mode 100644
index a30cbb1..0000000
--- a/src/main/java/org/apache/crunch/Tuple4.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A convenience class for four-element {@link Tuple}s.
- */
-public class Tuple4<V1, V2, V3, V4> implements Tuple {
-
-  private final V1 first;
-  private final V2 second;
-  private final V3 third;
-  private final V4 fourth;
-
-  public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
-    return new Tuple4<A, B, C, D>(a, b, c, d);
-  }
-  
-  public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
-    this.first = first;
-    this.second = second;
-    this.third = third;
-    this.fourth = fourth;
-  }
-
-  public V1 first() {
-    return first;
-  }
-
-  public V2 second() {
-    return second;
-  }
-
-  public V3 third() {
-    return third;
-  }
-
-  public V4 fourth() {
-    return fourth;
-  }
-
-  public Object get(int index) {
-    switch (index) {
-    case 0:
-      return first;
-    case 1:
-      return second;
-    case 2:
-      return third;
-    case 3:
-      return fourth;
-    default:
-      throw new ArrayIndexOutOfBoundsException();
-    }
-  }
-
-  public int size() {
-    return 4;
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third)
-    	.append(fourth).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second))) &&
-    	(third == other.third || (third != null && third.equals(other.third))) &&
-    	(fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
-  }
-
-  @Override
-  public String toString() {
-	StringBuilder sb = new StringBuilder("Tuple4[");
-	sb.append(first).append(",").append(second).append(",").append(third);
-	return sb.append(",").append(fourth).append("]").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/TupleN.java b/src/main/java/org/apache/crunch/TupleN.java
deleted file mode 100644
index 8e7524c..0000000
--- a/src/main/java/org/apache/crunch/TupleN.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Arrays;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * A {@link Tuple} instance for an arbitrary number of values.
- */
-public class TupleN implements Tuple {
-
-  private final Object values[];
-
-  public TupleN(Object... values) {
-    this.values = new Object[values.length];
-    System.arraycopy(values, 0, this.values, 0, values.length);
-  }
-
-  public Object get(int index) {
-    return values[index];
-  }
-
-  public int size() {
-    return values.length;
-  }
-  
-  @Override
-  public int hashCode() {
-  	HashCodeBuilder hcb = new HashCodeBuilder();
-  	for (Object v : values) {
-  	  hcb.append(v);
-  	}
-  	return hcb.toHashCode();
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    TupleN other = (TupleN) obj;
-    return Arrays.equals(this.values, other.values);
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
deleted file mode 100644
index f30801d..0000000
--- a/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-
-public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
-  
-  private final MapFn<R, S> first;
-  private final MapFn<S, T> second;
-  
-  public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
-    this.first = first;
-    this.second = second;
-  }
-  
-  @Override
-  public void initialize() {
-    first.setContext(getContext());
-    second.setContext(getContext());
-  }
-  
-  public MapFn<R, S> getFirst() {
-    return first;
-  }
-  
-  public MapFn<S, T> getSecond() {
-    return second;
-  }
-  
-  @Override
-  public T map(R input) {
-    return second.map(first.map(input));
-  }
-  
-  @Override
-  public void cleanup(Emitter<T> emitter) {
-    first.cleanup(null);
-    second.cleanup(null);
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    first.configure(conf);
-    second.configure(conf);
-  }
-
-  @Override
-  public void setConfigurationForTest(Configuration conf) {
-    first.setConfigurationForTest(conf);
-    second.setConfigurationForTest(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
deleted file mode 100644
index f5056c2..0000000
--- a/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-
-/**
- * Wrapper function for converting a {@code MapFn} into a key-value pair that
- * is used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
- */
-public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
-  
-  private final MapFn<V, K> mapFn;
-  
-  public ExtractKeyFn(MapFn<V, K> mapFn) {
-    this.mapFn = mapFn;
-  }
-  
-  @Override
-  public void initialize() {
-    this.mapFn.setContext(getContext());
-  }
-  
-  @Override
-  public Pair<K, V> map(V input) {
-    return Pair.of(mapFn.map(input), input);
-  }
-}


Mime
View raw message