incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [3/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
new file mode 100644
index 0000000..cd25bdd
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * A representation of an immutable, distributed collection of elements
+ * that is the fundamental target of computations in Crunch.
+ *
+ */
+public interface PCollection<S> {
+  /**
+   * Returns the {@code Pipeline} associated with this PCollection.
+   */
+  Pipeline getPipeline();
+  
+  /**
+   * Returns a {@code PCollection} instance that acts as the union
+   * of this {@code PCollection} and the input {@code PCollection}s.
+   */
+  PCollection<S> union(PCollection<S>... collections);
+
+  /**
+   * Applies the given doFn to the elements of this {@code PCollection} and
+   * returns a new {@code PCollection} that is the output of this processing.
+   * 
+   * @param doFn The {@code DoFn} to apply
+   * @param type The {@link PType} of the resulting {@code PCollection}
+   * @return a new {@code PCollection}
+   */
+  <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);
+  
+  /**
+   * Applies the given doFn to the elements of this {@code PCollection} and
+   * returns a new {@code PCollection} that is the output of this processing.
+   * 
+   * @param name An identifier for this processing step, useful for debugging
+   * @param doFn The {@code DoFn} to apply
+   * @param type The {@link PType} of the resulting {@code PCollection}
+   * @return a new {@code PCollection}
+   */
+  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
+
+  /**
+   * Similar to the other {@code parallelDo} instance, but returns a
+   * {@code PTable} instance instead of a {@code PCollection}.
+   * 
+   * @param doFn The {@code DoFn} to apply
+   * @param type The {@link PTableType} of the resulting {@code PTable}
+   * @return a new {@code PTable}
+   */
+  <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
+  
+  /**
+   * Similar to the other {@code parallelDo} instance, but returns a
+   * {@code PTable} instance instead of a {@code PCollection}.
+   * 
+   * @param name An identifier for this processing step
+   * @param doFn The {@code DoFn} to apply
+   * @param type The {@link PTableType} of the resulting {@code PTable}
+   * @return a new {@code PTable}
+   */
+  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
+      PTableType<K, V> type);
+
+  /**
+   * Write the contents of this {@code PCollection} to the given {@code Target},
+   * using the storage format specified by the target.
+   * 
+   * @param target The target to write to
+   */
+  PCollection<S> write(Target target);
+  
+  /**
+   * Returns a reference to the data set represented by this PCollection that
+   * may be used by the client to read the data locally.
+   */
+  Iterable<S> materialize();
+  
+  /**
+   * Returns the {@code PType} of this {@code PCollection}.
+   */
+  PType<S> getPType();
+
+  /**
+   * Returns the {@code PTypeFamily} of this {@code PCollection}.
+   */
+  PTypeFamily getTypeFamily();
+
+  /**
+   * Returns the size of the data represented by this {@code PCollection} in bytes.
+   */
+  long getSize();
+
+  /**
+   * Returns a shorthand name for this PCollection.
+   */
+  String getName();
+  
+  /**
+   * Apply the given filter function to this instance and return the
+   * resulting {@code PCollection}.
+   */
+  PCollection<S> filter(FilterFn<S> filterFn);
+  
+  /**
+   * Apply the given filter function to this instance and return the
+   * resulting {@code PCollection}.
+   * 
+   * @param name An identifier for this processing step
+   * @param filterFn The {@code FilterFn} to apply
+   */
+  PCollection<S> filter(String name, FilterFn<S> filterFn);
+  
+  /**
+   * Apply the given map function to each element of this instance in order
+   * to create a {@code PTable}.
+   */
+  <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
+ 
+  /**
+   * Apply the given map function to each element of this instance in order
+   * to create a {@code PTable}.
+   *   
+   * @param name An identifier for this processing step
+   * @param extractKeyFn The {@code MapFn} to apply
+   */
+  <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType);
+  
+  /**
+   * Returns a {@code PCollection} instance that contains all of the elements
+   * of this instance in sorted order.
+   */
+  PCollection<S> sort(boolean ascending);
+  
+  /**
+   * Returns a {@code PTable} instance that contains the counts of each unique
+   * element of this PCollection.
+   */
+  PTable<S, Long> count();
+  
+  /**
+   * Returns a {@code PCollection} made up of only the maximum element of this
+   * instance.
+   */
+  PCollection<S> max();
+  
+  /**
+   * Returns a {@code PCollection} made up of only the minimum element of this
+   * instance.
+   */
+  PCollection<S> min();
+  
+  /**
+   * Randomly sample items from this PCollection instance with the given
+   * probability of an item being accepted.
+   */
+  PCollection<S> sample(double acceptanceProbability);
+  
+  /**
+   * Randomly sample items from this PCollection instance with the given
+   * probability of an item being accepted and using the given seed.
+   */
+  PCollection<S> sample(double acceptanceProbability, long seed);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
new file mode 100644
index 0000000..96918a5
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * The Crunch representation of a grouped {@link PTable}.
+ *
+ */
+public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
+  /**
+   * Combines the values of this grouping using the given {@code CombineFn}.
+   * 
+   * @param combineFn The combiner function
+   * @return A {@code PTable} where each key has a single value
+   */
+  PTable<K, V> combineValues(CombineFn<K, V> combineFn);
+
+  /**
+   * Convert this grouping back into a multimap.
+   * 
+   * @return an ungrouped version of the data in this {@code PGroupedTable}.
+   */
+  PTable<K, V> ungroup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java
new file mode 100644
index 0000000..b673f6d
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/PTable.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+/**
+ * A sub-interface of {@code PCollection} that represents an immutable,
+ * distributed multi-map of keys and values.
+ *
+ */
+public interface PTable<K, V> extends PCollection<Pair<K, V>> {
+  /**
+   * Returns a {@code PTable} instance that acts as the union
+   * of this {@code PTable} and the input {@code PTable}s.
+   */
+  PTable<K, V> union(PTable<K, V>... others);
+
+  /**
+   * Performs a grouping operation on the keys of this table.
+   * @return a {@code PGroupedTable} instance that represents the grouping
+   */
+  PGroupedTable<K, V> groupByKey();
+
+  /**
+   * Performs a grouping operation on the keys of this table, using the given
+   * number of partitions.
+   * 
+   * @param numPartitions The number of partitions for the data.
+   * @return a {@code PGroupedTable} instance that represents this grouping
+   */
+  PGroupedTable<K, V> groupByKey(int numPartitions);
+  
+  /**
+   * Performs a grouping operation on the keys of this table, using the
+   * additional {@code GroupingOptions} to control how the grouping is
+   * executed.
+   * 
+   * @param options The grouping options to use
+   * @return a {@code PGroupedTable} instance that represents the grouping
+   */
+  PGroupedTable<K, V> groupByKey(GroupingOptions options);
+
+  /**
+   * Writes this {@code PTable} to the given {@code Target}.
+   */
+  PTable<K, V> write(Target target);
+  
+  /**
+   * Returns the {@code PTableType} of this {@code PTable}.
+   */
+  PTableType<K, V> getPTableType();
+  
+  /**
+   * Returns the {@code PType} of the key.
+   */
+  PType<K> getKeyType();
+
+  /**
+   * Returns the {@code PType} of the value.
+   */
+  PType<V> getValueType();
+  
+  /**
+   * Aggregate all of the values with the same key into a single
+   * key-value pair in the returned PTable.
+   */
+  PTable<K, Collection<V>> collectValues();
+  
+  /**
+   * Returns a PTable made up of the pairs in this PTable with the
+   * largest value field.
+   * @param count The number of pairs to return
+   */
+  PTable<K, V> top(int count);
+  
+  /**
+   * Returns a PTable made up of the pairs in this PTable with the
+   * smallest value field.
+   * @param count The number of pairs to return
+   */
+  PTable<K, V> bottom(int count);
+  
+  /**
+   * Perform an inner join on this table and the one passed in as
+   * an argument on their common keys.
+   */
+  <U> PTable<K, Pair<V, U>> join(PTable<K, U> other);
+  
+  /**
+   * Co-group operation with the given table on common keys.
+   */
+  <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other);
+
+  /**
+   * Returns a {@link PCollection} made up of the keys in this PTable.
+   */  
+  PCollection<K> keys();
+  
+  /**
+   * Returns a {@link PCollection} made up of the values in this PTable.
+   */
+  PCollection<V> values();
+  
+  /**
+   * Returns a Map<K, V> made up of the keys and values in this PTable.
+   * <p>
+   * <b>Note:</b> The contents of the returned map may not be exactly the same
+   * as this PTable, as a PTable is a multi-map (i.e. can contain multiple
+   * values for a single key).
+   */
+  Map<K, V> materializeToMap();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Pair.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pair.java b/crunch/src/main/java/org/apache/crunch/Pair.java
new file mode 100644
index 0000000..9d319ab
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Pair.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for two-element {@link Tuple}s.
+ */
+public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
+
+  private final K first;
+  private final V second;
+
+  public static <T, U> Pair<T, U> of(T first, U second) {
+    return new Pair<T, U>(first, second);
+  }
+
+  public Pair(K first, V second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  public K first() {
+    return first;
+  }
+
+  public V second() {
+    return second;
+  }
+
+  public Object get(int index) {
+    switch (index) {
+    case 0:
+      return first;
+    case 1:
+      return second;
+    default:
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  }
+
+  public int size() {
+    return 2;
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(first).append(second).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Pair<?, ?> other = (Pair<?, ?>) obj;
+    return (first == other.first || (first != null && first.equals(other.first))) &&
+    	(second == other.second || (second != null && second.equals(other.second)));
+  }
+
+  @Override
+  public String toString() {
+	StringBuilder sb = new StringBuilder("[");
+	sb.append(first).append(",").append(second).append("]");
+	return sb.toString();
+  }
+
+  private int cmp(Object lhs, Object rhs) {
+    if (lhs == rhs) {
+      return 0;
+    } else if (lhs != null && Comparable.class.isAssignableFrom(lhs.getClass())) {
+      return ((Comparable) lhs).compareTo(rhs);
+    }
+    return (lhs == null ? 0 : lhs.hashCode()) - (rhs == null ? 0 : rhs.hashCode());
+  }
+  
+  @Override  
+  public int compareTo(Pair<K, V> o) {
+    int diff = cmp(first, o.first);
+    if (diff == 0) {
+      diff = cmp(second, o.second);
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
new file mode 100644
index 0000000..fd60695
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Manages the state of a pipeline execution.
+ * 
+ */
+public interface Pipeline {
+  
+  /**
+   * Set the {@code Configuration} to use with this pipeline.
+   */
+  void setConfiguration(Configuration conf);
+  
+  /**
+   * Returns the name of this pipeline.
+   * @return Name of the pipeline
+   */
+  String getName();
+  
+  /**
+   * Returns the {@code Configuration} instance associated with this pipeline.
+   */
+  Configuration getConfiguration();
+  
+  /**
+   * Converts the given {@code Source} into a {@code PCollection} that is
+   * available to jobs run using this {@code Pipeline} instance.
+   * 
+   * @param source The source of data
+   * @return A PCollection that references the given source
+   */
+  <T> PCollection<T> read(Source<T> source);
+
+  /**
+   * A version of the read method for {@code TableSource} instances that
+   * map to {@code PTable}s.
+   * @param tableSource The source of the data
+   * @return A PTable that references the given source
+   */
+  <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
+  
+  /**
+   * Write the given collection to the given target on the next
+   * pipeline run.
+   * 
+   * @param collection The collection
+   * @param target The output target
+   */
+  void write(PCollection<?> collection, Target target);
+
+  /**
+   * Create the given PCollection and read the data it contains
+   * into the returned Collection instance for client use.
+   *
+   * @param pcollection The PCollection to materialize
+   * @return the data from the PCollection as a read-only Collection
+   */
+  <T> Iterable<T> materialize(PCollection<T> pcollection);
+  
+  /**
+   * Constructs and executes a series of MapReduce jobs in order
+   * to write data to the output targets.
+   */
+  PipelineResult run();
+
+  /**
+   * Run any remaining jobs required to generate outputs and then
+   * clean up any intermediate data files that were created in
+   * this run or previous calls to {@code run}.
+   */
+  PipelineResult done();
+
+  /**
+   * A convenience method for reading a text file.
+   */
+  PCollection<String> readTextFile(String pathName);
+
+  /**
+   * A convenience method for writing a text file.
+   */
+  <T> void writeTextFile(PCollection<T> collection, String pathName);
+  
+  /**
+   * Turn on debug logging for jobs that are run from this pipeline.
+   */
+  void enableDebug();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineResult.java b/crunch/src/main/java/org/apache/crunch/PipelineResult.java
new file mode 100644
index 0000000..6f97fe8
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/PipelineResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Container for the results of a call to {@code run} or {@code done} on the Pipeline interface that includes
+ * details and statistics about the component stages of the data pipeline.
+ */
+public class PipelineResult {
+
+  public static class StageResult {
+    
+    private final String stageName;
+    private final Counters counters;
+    
+    public StageResult(String stageName, Counters counters) {
+      this.stageName = stageName;
+      this.counters = counters;
+    }
+    
+    public String getStageName() {
+      return stageName;
+    }
+    
+    public Counters getCounters() {
+      return counters;
+    }
+    
+    public Counter findCounter(Enum<?> key) {
+      return counters.findCounter(key);
+    }
+    
+    public long getCounterValue(Enum<?> key) {
+      return findCounter(key).getValue();
+    }
+  }
+  
+  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult>of());
+  
+  private final List<StageResult> stageResults;
+  
+  public PipelineResult(List<StageResult> stageResults) {
+    this.stageResults = ImmutableList.copyOf(stageResults);
+  }
+  
+  public boolean succeeded() {
+    return !stageResults.isEmpty();
+  }
+  
+  public List<StageResult> getStageResults() {
+    return stageResults;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Source.java b/crunch/src/main/java/org/apache/crunch/Source.java
new file mode 100644
index 0000000..a77a656
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Source.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.types.PType;
+
+/**
+ * A {@code Source} represents an input data set that is an input to one
+ * or more MapReduce jobs.
+ *
+ */
+public interface Source<T> { 
+  /**
+   * Returns the {@code PType} for this source.
+   */
+  PType<T> getType();
+
+  /**
+   * Configure the given job to use this source as an input.
+   * 
+   * @param job The job to configure
+   * @param inputId For a multi-input job, an identifier for this input to the job
+   * @throws IOException
+   */
+  void configureSource(Job job, int inputId) throws IOException;
+
+  /**
+   * Returns the number of bytes in this {@code Source}.
+   */
+  long getSize(Configuration configuration);  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/SourceTarget.java b/crunch/src/main/java/org/apache/crunch/SourceTarget.java
new file mode 100644
index 0000000..3d06392
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/SourceTarget.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+
+/**
+ * An interface for classes that implement both the {@code Source} and
+ * the {@code Target} interfaces.
+ *
+ */
+public interface SourceTarget<T> extends Source<T>, Target {
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TableSource.java b/crunch/src/main/java/org/apache/crunch/TableSource.java
new file mode 100644
index 0000000..9ebade6
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/TableSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.types.PTableType;
+
+/**
+ * The interface {@code Source} implementations that return a {@link PTable}.
+ *
+ */
+public interface TableSource<K, V> extends Source<Pair<K, V>> {
+  PTableType<K, V> getTableType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java
new file mode 100644
index 0000000..75de874
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Target.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+
+/**
+ * A {@code Target} represents the output destination of a Crunch job.
+ *
+ */
+public interface Target {
+  boolean accept(OutputHandler handler, PType<?> ptype);
+  
+  <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple.java b/crunch/src/main/java/org/apache/crunch/Tuple.java
new file mode 100644
index 0000000..38ce188
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Tuple.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A fixed-size collection of Objects, used in Crunch for representing
+ * joins between {@code PCollection}s.
+ *
+ */
+public interface Tuple {
+
+  /**
+   * Returns the Object at the given index.
+   */
+  Object get(int index);
+
+  /**
+   * Returns the number of elements in this Tuple.
+   */
+  int size();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple3.java b/crunch/src/main/java/org/apache/crunch/Tuple3.java
new file mode 100644
index 0000000..d06f17c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Tuple3.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for three-element {@link Tuple}s.
+ */
+public class Tuple3<V1, V2, V3> implements Tuple {
+
+  private final V1 first;
+  private final V2 second;
+  private final V3 third;
+
+  public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
+    return new Tuple3<A, B, C>(a, b, c);
+  }
+  
+  public Tuple3(V1 first, V2 second, V3 third) {
+    this.first = first;
+    this.second = second;
+    this.third = third;
+  }
+
+  public V1 first() {
+    return first;
+  }
+
+  public V2 second() {
+    return second;
+  }
+
+  public V3 third() {
+    return third;
+  }
+
+  public Object get(int index) {
+    switch (index) {
+    case 0:
+      return first;
+    case 1:
+      return second;
+    case 2:
+      return third;
+    default:
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  }
+
+  public int size() {
+    return 3;
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(first).append(second).append(third).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
+    return (first == other.first || (first != null && first.equals(other.first))) &&
+    	(second == other.second || (second != null && second.equals(other.second))) &&
+    	(third == other.third || (third != null && third.equals(other.third)));
+  }
+
+  @Override
+  public String toString() {
+	StringBuilder sb = new StringBuilder("Tuple3[");
+	sb.append(first).append(",").append(second).append(",").append(third);
+	return sb.append("]").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple4.java b/crunch/src/main/java/org/apache/crunch/Tuple4.java
new file mode 100644
index 0000000..a30cbb1
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Tuple4.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for four-element {@link Tuple}s.
+ */
+public class Tuple4<V1, V2, V3, V4> implements Tuple {
+
+  private final V1 first;
+  private final V2 second;
+  private final V3 third;
+  private final V4 fourth;
+
+  public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
+    return new Tuple4<A, B, C, D>(a, b, c, d);
+  }
+  
+  public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
+    this.first = first;
+    this.second = second;
+    this.third = third;
+    this.fourth = fourth;
+  }
+
+  public V1 first() {
+    return first;
+  }
+
+  public V2 second() {
+    return second;
+  }
+
+  public V3 third() {
+    return third;
+  }
+
+  public V4 fourth() {
+    return fourth;
+  }
+
+  public Object get(int index) {
+    switch (index) {
+    case 0:
+      return first;
+    case 1:
+      return second;
+    case 2:
+      return third;
+    case 3:
+      return fourth;
+    default:
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  }
+
+  public int size() {
+    return 4;
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(first).append(second).append(third)
+    	.append(fourth).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
+    return (first == other.first || (first != null && first.equals(other.first))) &&
+    	(second == other.second || (second != null && second.equals(other.second))) &&
+    	(third == other.third || (third != null && third.equals(other.third))) &&
+    	(fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
+  }
+
+  @Override
+  public String toString() {
+	StringBuilder sb = new StringBuilder("Tuple4[");
+	sb.append(first).append(",").append(second).append(",").append(third);
+	return sb.append(",").append(fourth).append("]").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TupleN.java b/crunch/src/main/java/org/apache/crunch/TupleN.java
new file mode 100644
index 0000000..8e7524c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/TupleN.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Arrays;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A {@link Tuple} instance for an arbitrary number of values.
+ */
+public class TupleN implements Tuple {
+
+  private final Object values[];
+
+  public TupleN(Object... values) {
+    this.values = new Object[values.length];
+    System.arraycopy(values, 0, this.values, 0, values.length);
+  }
+
+  public Object get(int index) {
+    return values[index];
+  }
+
+  public int size() {
+    return values.length;
+  }
+  
+  @Override
+  public int hashCode() {
+  	HashCodeBuilder hcb = new HashCodeBuilder();
+  	for (Object v : values) {
+  	  hcb.append(v);
+  	}
+  	return hcb.toHashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TupleN other = (TupleN) obj;
+    return Arrays.equals(this.values, other.values);
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
new file mode 100644
index 0000000..f30801d
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+
+public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
+  
+  private final MapFn<R, S> first;
+  private final MapFn<S, T> second;
+  
+  public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
+    this.first = first;
+    this.second = second;
+  }
+  
+  @Override
+  public void initialize() {
+    first.setContext(getContext());
+    second.setContext(getContext());
+  }
+  
+  public MapFn<R, S> getFirst() {
+    return first;
+  }
+  
+  public MapFn<S, T> getSecond() {
+    return second;
+  }
+  
+  @Override
+  public T map(R input) {
+    return second.map(first.map(input));
+  }
+  
+  @Override
+  public void cleanup(Emitter<T> emitter) {
+    first.cleanup(null);
+    second.cleanup(null);
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    first.configure(conf);
+    second.configure(conf);
+  }
+
+  @Override
+  public void setConfigurationForTest(Configuration conf) {
+    first.setConfigurationForTest(conf);
+    second.setConfigurationForTest(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
new file mode 100644
index 0000000..f5056c2
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+
+/**
+ * Wrapper function for converting a {@code MapFn} into a key-value pair that
+ * is used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
+ */
+public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
+  
+  private final MapFn<V, K> mapFn;
+  
+  public ExtractKeyFn(MapFn<V, K> mapFn) {
+    this.mapFn = mapFn;
+  }
+  
+  @Override
+  public void initialize() {
+    this.mapFn.setContext(getContext());
+  }
+  
+  @Override
+  public Pair<K, V> map(V input) {
+    return Pair.of(mapFn.map(input), input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java b/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
new file mode 100644
index 0000000..3d0abc5
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.MapFn;
+
+public class IdentityFn<T> extends MapFn<T, T> {
+  
+  private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
+
+  @SuppressWarnings("unchecked")
+  public static <T> IdentityFn<T> getInstance() {
+    return (IdentityFn<T>) INSTANCE;
+  }
+
+  // Non-instantiable
+  private IdentityFn() {
+  }
+
+  @Override
+  public T map(T input) {
+    return input;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
new file mode 100644
index 0000000..45ddf52
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
+  
+  @Override
+  public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
+    emitter.emit(Pair.of(map(input.first()), input.second()));
+  }
+
+  public abstract K2 map(K1 k1);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
new file mode 100644
index 0000000..3c7065e
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
+  
+  @Override
+  public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
+    emitter.emit(Pair.of(input.first(), map(input.second())));
+  }
+
+  public abstract V2 map(V1 v);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
new file mode 100644
index 0000000..2cfd17b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+
+public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
+  
+  private MapFn<K, S> keys;
+  private MapFn<V, T> values;
+
+  public PairMapFn(MapFn<K, S> keys, MapFn<V, T> values) {
+    this.keys = keys;
+    this.values = values;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    keys.configure(conf);
+    values.configure(conf);
+  }
+  
+  @Override
+  public void initialize() {
+    keys.setContext(getContext());
+    values.setContext(getContext());
+  }
+
+  @Override
+  public Pair<S, T> map(Pair<K, V> input) {
+    return Pair.of(keys.map(input.first()), values.map(input.second()));
+  }
+  
+  @Override
+  public void cleanup(Emitter<Pair<S, T>> emitter) {
+    keys.cleanup(null);
+    values.cleanup(null);
+  }
+
+  @Override
+  public void setConfigurationForTest(Configuration conf) {
+    keys.setConfigurationForTest(conf);
+    values.setConfigurationForTest(conf);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
new file mode 100644
index 0000000..6305fcb
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mem.collect.MemCollection;
+import org.apache.crunch.impl.mem.collect.MemTable;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class MemPipeline implements Pipeline {
+
+  private static final Log LOG = LogFactory.getLog(MemPipeline.class);
+  
+  private static final MemPipeline INSTANCE = new MemPipeline();
+  
+  public static Pipeline getInstance() {
+    return INSTANCE;
+  }
+  
+  public static <T> PCollection<T> collectionOf(T...ts) {
+    return new MemCollection<T>(ImmutableList.copyOf(ts));  
+  }
+  
+  public static <T> PCollection<T> collectionOf(Iterable<T> collect) {
+    return new MemCollection<T>(collect);
+  }
+  
+  public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, T... ts) {
+    return new MemCollection<T>(ImmutableList.copyOf(ts), ptype, null);  
+  }
+  
+  public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, Iterable<T> collect) {
+    return new MemCollection<T>(collect, ptype, null);  
+  }
+  
+  public static <S, T> PTable<S, T> tableOf(S s, T t, Object... more) {
+    List<Pair<S, T>> pairs = Lists.newArrayList();
+    pairs.add(Pair.of(s, t));
+    for (int i = 0; i < more.length; i += 2) {
+      pairs.add(Pair.of((S) more[i], (T) more[i + 1]));
+    }
+    return new MemTable<S, T>(pairs);
+  }
+  
+  public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, S s, T t, Object... more) {
+    List<Pair<S, T>> pairs = Lists.newArrayList();
+    pairs.add(Pair.of(s, t));
+    for (int i = 0; i < more.length; i += 2) {
+      pairs.add(Pair.of((S) more[i], (T) more[i + 1]));
+    }
+    return new MemTable<S, T>(pairs, ptype, null);
+  }
+  
+  public static <S, T> PTable<S, T> tableOf(Iterable<Pair<S, T>> pairs) {
+    return new MemTable<S, T>(pairs);
+  }
+  
+  public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, Iterable<Pair<S, T>> pairs) {
+    return new MemTable<S, T>(pairs, ptype, null);
+  }
+  
+  private Configuration conf = new Configuration();
+
+  private MemPipeline() {
+  }
+  
+  @Override
+  public void setConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public <T> PCollection<T> read(Source<T> source) {
+    if (source instanceof ReadableSource) {
+      try {
+        Iterable<T> iterable = ((ReadableSource<T>) source).read(conf);
+        return new MemCollection<T>(iterable, source.getType(), source.toString());
+      } catch (IOException e) {
+        LOG.error("Exception reading source: " + source.toString(), e);
+        throw new IllegalStateException(e);
+      }
+    }
+    LOG.error("Source " + source + " is not readable");
+    throw new IllegalStateException("Source " + source + " is not readable");
+  }
+
+  @Override
+  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
+    if (source instanceof ReadableSource) {
+      try {
+        Iterable<Pair<K, V>> iterable = ((ReadableSource<Pair<K, V>>) source).read(conf);
+        return new MemTable<K, V>(iterable, source.getTableType(), source.toString());
+      } catch (IOException e) {
+        LOG.error("Exception reading source: " + source.toString(), e);
+        throw new IllegalStateException(e);
+      }
+    }
+    LOG.error("Source " + source + " is not readable");
+    throw new IllegalStateException("Source " + source + " is not readable");
+  }
+
+  @Override
+  public void write(PCollection<?> collection, Target target) {
+    if (target instanceof PathTarget) {
+      Path path = ((PathTarget) target).getPath();
+      try {
+        FileSystem fs = FileSystem.get(conf);
+        FSDataOutputStream os = fs.create(new Path(path, "out"));
+        if (collection instanceof PTable) {
+          for (Object o : collection.materialize()) {
+            Pair p = (Pair) o;
+            os.writeBytes(p.first().toString());
+            os.writeBytes("\t");
+            os.writeBytes(p.second().toString());
+            os.writeBytes("\r\n");
+          }
+        } else {
+          for (Object o : collection.materialize()) {
+            os.writeBytes(o.toString() + "\r\n");
+          }
+        }
+        os.close();
+      } catch (IOException e) {
+        LOG.error("Exception writing target: " + target, e);
+      }
+    } else {
+      LOG.error("Target " + target + " is not a PathTarget instance");
+    }
+  }
+
+  @Override
+  public PCollection<String> readTextFile(String pathName) {
+    return read(At.textFile(pathName));
+  }
+
+  @Override
+  public <T> void writeTextFile(PCollection<T> collection, String pathName) {
+    write(collection, At.textFile(pathName));
+  }
+
+  @Override
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+    return pcollection.materialize();
+  }
+
+  @Override
+  public PipelineResult run() {
+    return PipelineResult.EMPTY;
+  }
+
+  @Override
+  public PipelineResult done() {
+    return PipelineResult.EMPTY;
+  }
+
+  @Override
+  public void enableDebug() {
+	LOG.info("Note: in-memory pipelines do not have debug logging");
+  }
+  
+  @Override
+  public String getName() {
+	  return "Memory Pipeline";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
new file mode 100644
index 0000000..7291e50
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Sample;
+import org.apache.crunch.lib.Sort;
+import org.apache.crunch.test.InMemoryEmitter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+
+public class MemCollection<S> implements PCollection<S> {
+
+  private final Collection<S> collect;
+  private final PType<S> ptype;
+  private String name;
+  
+  public MemCollection(Iterable<S> collect) {
+    this(collect, null, null);
+  }
+  
+  public MemCollection(Iterable<S> collect, PType<S> ptype) {
+    this(collect, ptype, null);
+  }
+  
+  public MemCollection(Iterable<S> collect, PType<S> ptype, String name) {
+    this.collect = ImmutableList.copyOf(collect);
+    this.ptype = ptype;
+    this.name = name;
+  }
+  
+  @Override
+  public Pipeline getPipeline() {
+    return MemPipeline.getInstance();
+  }
+
+  @Override
+  public PCollection<S> union(PCollection<S>... collections) {
+    Collection<S> output = Lists.newArrayList();    
+    for (PCollection<S> pcollect : collections) {
+      for (S s : pcollect.materialize()) {
+        output.add(s);
+      }
+    }
+    output.addAll(collect);
+    return new MemCollection<S>(output, collections[0].getPType());
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type) {
+    return parallelDo(null, doFn, type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type) {
+    InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
+    doFn.initialize();
+    for (S s : collect) {
+      doFn.process(s, emitter);
+    }
+    doFn.cleanup(emitter);
+    return new MemCollection<T>(emitter.getOutput(), type, name);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
+    return parallelDo(null, doFn, type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
+      PTableType<K, V> type) {
+    InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
+    doFn.initialize();
+    for (S s : collect) {
+      doFn.process(s, emitter);
+    }
+    doFn.cleanup(emitter);
+    return new MemTable<K, V>(emitter.getOutput(), type, name);
+  }
+
+  @Override
+  public PCollection<S> write(Target target) {
+    getPipeline().write(this, target);
+    return this;
+  }
+
+  @Override
+  public Iterable<S> materialize() {
+    return collect;
+  }
+
+  public Collection<S> getCollection() {
+    return collect;
+  }
+  
+  @Override
+  public PType<S> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public PTypeFamily getTypeFamily() {
+    if (ptype != null) {
+      return ptype.getFamily();
+    }
+    return null;
+  }
+
+  @Override
+  public long getSize() {
+    return collect.size();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+  
+  @Override
+  public String toString() {
+    return collect.toString();
+  }
+
+  @Override
+  public PTable<S, Long> count() {
+	return Aggregate.count(this);
+  }
+
+  @Override
+  public PCollection<S> sample(double acceptanceProbability) {
+	return Sample.sample(this, acceptanceProbability);
+  }
+
+  @Override
+  public PCollection<S> sample(double acceptanceProbability, long seed) {
+	return Sample.sample(this, seed, acceptanceProbability);
+  }
+
+  @Override
+  public PCollection<S> max() {
+	return Aggregate.max(this);
+  }
+
+  @Override
+  public PCollection<S> min() {
+	return Aggregate.min(this);
+  }
+
+  @Override
+  public PCollection<S> sort(boolean ascending) {
+	return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
+  }
+
+  @Override
+  public PCollection<S> filter(FilterFn<S> filterFn) {
+    return parallelDo(filterFn, getPType());
+  }
+  
+  @Override
+  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
+    return parallelDo(name, filterFn, getPType());
+  }
+  
+  @Override
+  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  @Override
+  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
new file mode 100644
index 0000000..4e114ab
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
+
+  private final MemTable<K, V> parent;
+  
+  private static <S, T> Map<S, Collection<T>> createMapFor(PType<S> keyType, GroupingOptions options, Pipeline pipeline) {
+    if (options != null && options.getSortComparatorClass() != null) {
+      RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
+          pipeline.getConfiguration());
+      return new TreeMap<S, Collection<T>>(rc);
+    } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) {
+      return new TreeMap<S, Collection<T>>();
+    }
+    return Maps.newHashMap();
+  }
+  
+  private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
+    PType<S> keyType = parent.getKeyType();
+    Map<S, Collection<T>> map = createMapFor(keyType, options, parent.getPipeline());
+    
+    for (Pair<S, T> pair : parent.materialize()) {
+      S key = pair.first();
+      if (!map.containsKey(key)) {
+        map.put(key, Lists.<T>newArrayList());
+      }
+      map.get(key).add(pair.second());
+    }
+    
+    List<Pair<S, Iterable<T>>> values = Lists.newArrayList();
+    for (Map.Entry<S, Collection<T>> e : map.entrySet()) {
+      values.add(Pair.of(e.getKey(), (Iterable<T>) e.getValue()));
+    }
+    return values;
+  }
+  
+  public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {
+	super(buildMap(parent, options));
+    this.parent = parent;
+  }
+
+  @Override
+  public PCollection<Pair<K, Iterable<V>>> union(
+      PCollection<Pair<K, Iterable<V>>>... collections) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PCollection<Pair<K, Iterable<V>>> write(Target target) {
+    getPipeline().write(this.ungroup(), target);
+    return this;
+  }
+
+  @Override
+  public PType<Pair<K, Iterable<V>>> getPType() {
+    PTableType<K, V> parentType = parent.getPTableType();
+    if (parentType != null) {
+      return parentType.getGroupedTableType();
+    }
+    return null;
+  }
+
+  @Override
+  public PTypeFamily getTypeFamily() {
+    return parent.getTypeFamily();
+  }
+
+  @Override
+  public long getSize() {
+    return parent.getSize();
+  }
+
+  @Override
+  public String getName() {
+    return "MemGrouped(" + parent.getName() + ")";
+  }
+
+  @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
+    return parallelDo(combineFn, parent.getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> ungroup() {
+    return parent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
new file mode 100644
index 0000000..53e7526
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Cogroup;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.materialize.MaterializableMap;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<K, V> {
+
+  private PTableType<K, V> ptype;
+  
+  public MemTable(Iterable<Pair<K, V>> collect) {
+    this(collect, null, null);
+  }
+  
+  public MemTable(Iterable<Pair<K, V>> collect, PTableType<K, V> ptype, String name) {
+    super(collect, ptype, name);
+    this.ptype = ptype;
+  }
+  
+  @Override
+  public PTable<K, V> union(PTable<K, V>... others) {
+    List<Pair<K, V>> values = Lists.newArrayList();
+    values.addAll(getCollection());
+    for (PTable<K, V> ptable : others) {
+      for (Pair<K, V> p : ptable.materialize()) {
+        values.add(p);
+      }
+    }
+    return new MemTable<K, V>(values, others[0].getPTableType(), null);
+  }
+
+  @Override
+  public PGroupedTable<K, V> groupByKey() {
+    return groupByKey(null);
+  }
+
+  @Override
+  public PGroupedTable<K, V> groupByKey(int numPartitions) {
+    return groupByKey(null);
+  }
+
+  @Override
+  public PGroupedTable<K, V> groupByKey(GroupingOptions options) {
+    return new MemGroupedTable<K, V>(this, options);
+  }
+
+  @Override
+  public PTable<K, V> write(Target target) {
+    super.write(target);
+    return this;
+  }
+  
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<K> getKeyType() {
+    if (ptype != null) {
+      return ptype.getKeyType();
+    }
+    return null;
+  }
+
+  @Override
+  public PType<V> getValueType() {
+    if (ptype != null) {
+      return ptype.getValueType();
+    }
+    return null;
+  }
+
+  @Override
+  public PTable<K, V> top(int count) {
+	return Aggregate.top(this, count, true);
+  }
+
+  @Override
+  public PTable<K, V> bottom(int count) {
+	return Aggregate.top(this, count, false);
+  }
+
+  @Override
+  public PTable<K, Collection<V>> collectValues() {
+	return Aggregate.collectValues(this);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
+	return Join.join(this, other);
+  }
+  
+  @Override
+  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
+	return Cogroup.cogroup(this, other);
+  }
+  
+  @Override
+  public PCollection<K> keys() {
+	return PTables.keys(this);
+  }
+
+  @Override
+  public PCollection<V> values() {
+    return PTables.values(this);
+  }
+
+  @Override
+  public Map<K, V> materializeToMap() {
+    return new MaterializableMap<K, V>(this.materialize());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
new file mode 100644
index 0000000..0b7d8d7
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.InputTable;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.collect.UnionCollection;
+import org.apache.crunch.impl.mr.collect.UnionTable;
+import org.apache.crunch.impl.mr.plan.MSCRPlanner;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class MRPipeline implements Pipeline {
+
+  private static final Log LOG = LogFactory.getLog(MRPipeline.class);
+
+  private static final Random RANDOM = new Random();
+
+  private final Class<?> jarClass;
+  private final String name;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
+  private final Path tempDirectory;
+  private int tempFileIndex;
+  private int nextAnonymousStageId;
+
+  private Configuration conf;
+
+  public MRPipeline(Class<?> jarClass) throws IOException {
+    this(jarClass, new Configuration());
+  }
+
+  public MRPipeline(Class<?> jarClass, String name) {
+    this(jarClass, name, new Configuration());
+  }
+
+  public MRPipeline(Class<?> jarClass, Configuration conf) {
+    this(jarClass, jarClass.getName(), conf);
+  }
+
+  public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
+    this.jarClass = jarClass;
+    this.name = name;
+    this.outputTargets = Maps.newHashMap();
+    this.outputTargetsToMaterialize = Maps.newHashMap();
+    this.conf = conf;
+    this.tempDirectory = createTempDirectory(conf);
+    this.tempFileIndex = 0;
+    this.nextAnonymousStageId = 0;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public void setConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public PipelineResult run() {
+    MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
+    PipelineResult res = null;
+    try {
+      res = planner.plan(jarClass, conf).execute();
+    } catch (IOException e) {
+      LOG.error(e);
+      return PipelineResult.EMPTY;
+    }
+    for (PCollectionImpl<?> c : outputTargets.keySet()) {
+      if (outputTargetsToMaterialize.containsKey(c)) {
+        MaterializableIterable iter = outputTargetsToMaterialize.get(c);
+        iter.materialize();
+        c.materializeAt(iter.getSourceTarget());
+        outputTargetsToMaterialize.remove(c);
+      } else {
+        boolean materialized = false;
+        for (Target t : outputTargets.get(c)) {
+          if (!materialized && t instanceof Source) {
+            c.materializeAt((SourceTarget) t);
+            materialized = true;
+          }
+        }
+      }
+    }
+    outputTargets.clear();
+    return res;
+  }
+
+  @Override
+  public PipelineResult done() {
+    PipelineResult res = null;
+    if (!outputTargets.isEmpty()) {
+      res = run();
+    }
+    cleanup();
+    return res;
+  }
+
+  public <S> PCollection<S> read(Source<S> source) {
+    return new InputCollection<S>(source, this);
+  }
+
+  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
+    return new InputTable<K, V>(source, this);
+  }
+
+  public PCollection<String> readTextFile(String pathName) {
+    return read(At.textFile(pathName));
+  }
+
+  @SuppressWarnings("unchecked")
+  public void write(PCollection<?> pcollection, Target target) {
+    if (pcollection instanceof PGroupedTableImpl) {
+      pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
+    } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
+      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    }
+    addOutput((PCollectionImpl<?>) pcollection, target);
+  }
+
+  private void addOutput(PCollectionImpl<?> impl, Target target) {
+    if (!outputTargets.containsKey(impl)) {
+      outputTargets.put(impl, Sets.<Target> newHashSet());
+    }
+    outputTargets.get(impl).add(target);
+  }
+
+  @Override
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+
+    PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
+    ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl);
+
+    MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
+    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
+      outputTargetsToMaterialize.put(pcollectionImpl, c);
+    }
+    return c;
+  }
+
+  /**
+   * Retrieve a ReadableSourceTarget that provides access to the contents of a
+   * {@link PCollection}. This is primarily intended as a helper method to
+   * {@link #materialize(PCollection)}. The underlying data of the
+   * ReadableSourceTarget may not be actually present until the pipeline is run.
+   * 
+   * @param pcollection
+   *          The collection for which the ReadableSourceTarget is to be
+   *          retrieved
+   * @return The ReadableSourceTarget
+   * @throws IllegalArgumentException
+   *           If no ReadableSourceTarget can be retrieved for the given
+   *           PCollection
+   */
+  public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
+    PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
+    SourceTarget<T> matTarget = impl.getMaterializedAt();
+    if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
+      return (ReadableSourceTarget<T>) matTarget;
+    }
+
+    ReadableSourceTarget<T> srcTarget = null;
+    if (outputTargets.containsKey(pcollection)) {
+      for (Target target : outputTargets.get(impl)) {
+        if (target instanceof ReadableSourceTarget) {
+          srcTarget = (ReadableSourceTarget<T>) target;
+          break;
+        }
+      }
+    }
+
+    if (srcTarget == null) {
+      SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
+      if (!(st instanceof ReadableSourceTarget)) {
+        throw new IllegalArgumentException("The PType for the given PCollection is not readable"
+            + " and cannot be materialized");
+      } else {
+        srcTarget = (ReadableSourceTarget<T>) st;
+        addOutput(impl, srcTarget);
+      }
+    }
+
+    return srcTarget;
+  }
+
+  /**
+   * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections.
+   * @param pcollection The PCollection to be cast/transformed
+   * @return The PCollectionImpl representation
+   */
+  private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
+    PCollectionImpl<T> pcollectionImpl = null;
+    if (pcollection instanceof UnionCollection) {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    } else {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection;
+    }
+    return pcollectionImpl;
+  }
+
+  public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
+    return ptype.getDefaultFileSource(createTempPath());
+  }
+
+  public Path createTempPath() {
+    tempFileIndex++;
+    return new Path(tempDirectory, "p" + tempFileIndex);
+  }
+
+  private static Path createTempDirectory(Configuration conf) {
+    Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
+    try {
+      FileSystem.get(conf).mkdirs(dir);
+    } catch (IOException e) {
+      LOG.error("Exception creating job output directory", e);
+      throw new RuntimeException(e);
+    }
+    return dir;
+  }
+
+  @Override
+  public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
+    // Ensure that this is a writable pcollection instance.
+    pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily
+        .getInstance().as(pcollection.getPType()));
+    write(pcollection, At.textFile(pathName));
+  }
+
+  private void cleanup() {
+    if (!outputTargets.isEmpty()) {
+      LOG.warn("Not running cleanup while output targets remain");
+      return;
+    }
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(tempDirectory)) {
+        fs.delete(tempDirectory, true);
+      }
+    } catch (IOException e) {
+      LOG.info("Exception during cleanup", e);
+    }
+  }
+
+  public int getNextAnonymousStageId() {
+    return nextAnonymousStageId++;
+  }
+
+  @Override
+  public void enableDebug() {
+    // Turn on Crunch runtime error catching.
+    getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
+
+    // Write Hadoop's WARN logs to the console.
+    Logger crunchInfoLogger = LogManager.getLogger("org.apache.crunch");
+    Appender console = crunchInfoLogger.getAppender("A");
+    if (console != null) {
+      Logger hadoopLogger = LogManager.getLogger("org.apache.hadoop");
+      hadoopLogger.setLevel(Level.WARN);
+      hadoopLogger.addAppender(console);
+    } else {
+      LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs");
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
new file mode 100644
index 0000000..3c9d522
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.ImmutableList;
+
+public class DoCollectionImpl<S> extends PCollectionImpl<S> {
+
+  private final PCollectionImpl<Object> parent;
+  private final DoFn<Object, S> fn;
+  private final PType<S> ntype;
+
+  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn,
+      PType<S> ntype) {
+    super(name);
+    this.parent = (PCollectionImpl<Object>) parent;
+    this.fn = (DoFn<Object, S>) fn;
+    this.ntype = ntype;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return (long) (fn.scaleFactor() * parent.getSize());
+  }
+  
+  @Override
+  public PType<S> getPType() {
+    return ntype;
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitDoFnCollection(this);
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createFnNode(getName(), fn, ntype);
+  }
+}


Mime
View raw message