crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-397: Enrich PCollection and Pipeline APIs for Scrunch.
Date Tue, 20 May 2014 03:56:49 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 2e2a7ddd2 -> 2c2e5b0f7


CRUNCH-397: Enrich PCollection and Pipeline APIs for Scrunch.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2c2e5b0f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2c2e5b0f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2c2e5b0f

Branch: refs/heads/apache-crunch-0.8
Commit: 2c2e5b0f77bd5c77f0071ef06de775040442a24e
Parents: 2e2a7dd
Author: Josh Wills <jwills@apache.org>
Authored: Fri May 16 00:49:49 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon May 19 20:50:40 2014 -0700

----------------------------------------------------------------------
 .../crunch/scrunch/PageRankClassTest.scala      |   2 +-
 .../apache/crunch/scrunch/PageRankTest.scala    |   3 +-
 .../org/apache/crunch/scrunch/Conversions.scala |  18 ++
 .../org/apache/crunch/scrunch/PCollection.scala |  99 ++++++--
 .../apache/crunch/scrunch/PCollectionLike.scala | 223 ++++++++++++++++++-
 .../apache/crunch/scrunch/PGroupedTable.scala   | 119 +++++++---
 .../org/apache/crunch/scrunch/PTable.scala      | 119 ++++++++--
 .../org/apache/crunch/scrunch/Pipeline.scala    |   1 +
 .../apache/crunch/scrunch/PipelineLike.scala    |  50 ++++-
 9 files changed, 552 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
index 585c7aa..4cd9e84 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
@@ -64,7 +64,7 @@ class PageRankClassTest extends CrunchSuite {
   lazy val pipeline = Pipeline.mapReduce[PageRankTest](tempDir.getDefaultConfiguration)
 
   def initialInput(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
+    pipeline.read(from.textFile(fileName, Avros.strings))
       .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
       .groupByKey
       .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
index dbd14aa..541502f 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
@@ -52,7 +52,8 @@ class PageRankTest extends CrunchSuite {
 
   def initialInput(fileName: String) = {
     pipeline.read(from.textFile(fileName))
-      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
+      .withPType(Avros.strings)
+      .mapWithContext((line, ctxt) => { ctxt.getConfiguration; val urls = line.split("\\t");
(urls(0), urls(1)) })
       .groupByKey
       .map((url, links) => (url, (1f, 0f, links.toList)))
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
index 744685f..833e6d9 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer
 import scala.collection.Iterable
 import scala.reflect.ClassTag
 import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce.TaskInputOutputContext
 
 trait CanParallelTransform[El, To] {
   def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]):
To
@@ -51,12 +52,29 @@ object CanParallelTransform extends LowPriorityParallelTransforms {
 
   def kvWrapFn[A, K, V](fn: DoFn[A, (K, V)]) = {
     new DoFn[A, CPair[K, V]] {
+
+      override def setContext(ctxt: TaskInputOutputContext[_, _, _, _]) {
+        super.setContext(ctxt)
+        fn.setContext(ctxt)
+      }
+
+      override def initialize() {
+        fn.initialize()
+      }
+
       override def process(input: A, emitFn: Emitter[CPair[K, V]]) {
         fn.process(input, new Emitter[(K, V)] {
           override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) }
           override def flush() { emitFn.flush() }
         })
       }
+
+      override def cleanup(emitFn: Emitter[CPair[K, V]]) {
+        fn.cleanup(new Emitter[(K, V)] {
+          override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) }
+          override def flush() { emitFn.flush() }
+        })
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 49ee6c0..2d4ed44 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -19,26 +19,32 @@ package org.apache.crunch.scrunch
 
 import scala.collection.JavaConversions
 
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{PCollection => JCollection, Pair => CPair}
+import org.apache.crunch.{PCollection => JCollection, Pair => CPair, _}
 import org.apache.crunch.lib.{Aggregate, Cartesian, Sample}
 import org.apache.crunch.scrunch.Conversions._
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
+import org.apache.hadoop.mapreduce.TaskInputOutputContext
+import org.apache.crunch.types.PType
+import org.apache.crunch.fn.IdentityFn
 
 class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S],
JCollection[S]] {
   import PCollection._
 
-  def filter(f: S => Boolean): PCollection[S] = {
-    parallelDo(filterFn[S](f), native.getPType())
-  }
+  type FunctionType[T] = S => T
+  type CtxtFunctionType[T] = (S, TIOC) => T
 
-  def map[T, To](f: S => T)(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To
= {
-    b(this, mapFn(f), pt.get(getTypeFamily()))
-  }
+  protected def wrapFlatMapFn[T](fmt: S => TraversableOnce[T]) = flatMapFn(fmt)
+  protected def wrapMapFn[T](fmt: S => T) = mapFn(fmt)
+  protected def wrapFilterFn(fmt: S => Boolean) = filterFn(fmt)
+  protected def wrapFlatMapWithCtxtFn[T](fmt: (S, TIOC) => TraversableOnce[T]) = flatMapWithCtxtFn(fmt)
+  protected def wrapMapWithCtxtFn[T](fmt: (S, TIOC) => T) = mapWithCtxtFn(fmt)
+  protected def wrapFilterWithCtxtFn(fmt: (S, TIOC) => Boolean) = filterWithCtxtFn(fmt)
+  protected def wrapPairFlatMapFn[K, V](fmt: S => TraversableOnce[(K, V)]) = pairFlatMapFn(fmt)
+  protected def wrapPairMapFn[K, V](fmt: S => (K, V)) = pairMapFn(fmt)
 
-  def flatMap[T, To](f: S => Traversable[T])
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, flatMapFn(f), pt.get(getTypeFamily()))
+  def withPType(pt: PType[S]): PCollection[S] = {
+    val ident: MapFn[S, S] = IdentityFn.getInstance()
+    wrap(native.parallelDo("withPType", ident, pt))
   }
 
   def union(others: PCollection[S]*) = {
@@ -65,7 +71,7 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S,
PCol
     JavaConversions.iterableAsScalaIterable[S](native.materialize)
   }
 
-  def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
+  protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
 
   def count() = {
     val count = new PTable[S, java.lang.Long](Aggregate.count(native))
@@ -84,10 +90,10 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S,
PCol
     wrap(Sample.sample(native, seed, acceptanceProbability))
   }
 
-  def pType = native.getPType()
+  def pType() = native.getPType()
 }
 
-trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] {
+trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, TraversableOnce[T]] {
   override def process(input: S, emitter: Emitter[T]) {
     for (v <- apply(input)) {
       emitter.emit(v)
@@ -95,12 +101,43 @@ trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]]
{
   }
 }
 
+trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] {
+  override def map(input: S) = apply(input)
+}
+
 trait SFilterFn[T] extends FilterFn[T] with Function1[T, Boolean] {
   override def accept(input: T) = apply(input)
 }
 
-trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] {
-  override def map(input: S) = apply(input)
+class SDoWithCtxtFn[S, T](val f: (S, TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T])
extends DoFn[S, T] {
+  override def process(input: S, emitter: Emitter[T]) {
+    for (v <- f(input, getContext)) {
+      emitter.emit(v)
+    }
+  }
+}
+
+class SMapWithCtxtFn[S, T](val f: (S, TaskInputOutputContext[_, _, _, _]) => T) extends
MapFn[S, T] {
+  override def map(input: S) = f(input, getContext)
+}
+
+class SFilterWithCtxtFn[T](val f: (T, TaskInputOutputContext[_, _, _, _]) => Boolean)
extends FilterFn[T] {
+  override def accept(input: T) = f.apply(input, getContext)
+}
+
+trait SDoPairFn[S, K, V] extends DoFn[S, CPair[K, V]] with Function1[S, TraversableOnce[(K,
V)]] {
+  override def process(input: S, emitter: Emitter[CPair[K, V]]) {
+    for (v <- apply(input)) {
+      emitter.emit(CPair.of(v._1, v._2))
+    }
+  }
+}
+
+trait SMapPairFn[S, K, V] extends MapFn[S, CPair[K, V]] with Function1[S, (K, V)] {
+  override def map(input: S): CPair[K, V] = {
+    val t = apply(input)
+    CPair.of(t._1, t._2)
+  }
 }
 
 trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] {
@@ -110,19 +147,41 @@ trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S,
K] {
 }
 
 object PCollection {
+  type TIOC = TaskInputOutputContext[_, _, _, _]
+
+  def flatMapFn[S, T](fn: S => TraversableOnce[T]) = {
+    new SDoFn[S, T] { def apply(s: S) = fn(s) }
+  }
+
+  def mapFn[S, T](fn: S => T) = {
+    new SMapFn[S, T] { def apply(s: S) = fn(s) }
+  }
+
   def filterFn[S](fn: S => Boolean) = {
     new SFilterFn[S] { def apply(x: S) = fn(x) }
   }
 
+  def flatMapWithCtxtFn[S, T](fn: (S, TIOC) => TraversableOnce[T]) = {
+    new SDoWithCtxtFn[S, T](fn)
+  }
+
+  def mapWithCtxtFn[S, T](fn: (S, TIOC) => T) = {
+    new SMapWithCtxtFn[S, T](fn)
+  }
+
+  def filterWithCtxtFn[S](fn: (S, TIOC) => Boolean) = {
+    new SFilterWithCtxtFn[S](fn)
+  }
+
   def mapKeyFn[S, K](fn: S => K) = {
     new SMapKeyFn[S, K] { def apply(x: S) = fn(x) }
   }
 
-  def mapFn[S, T](fn: S => T) = {
-    new SMapFn[S, T] { def apply(s: S) = fn(s) }
+  def pairMapFn[S, K, V](fn: S => (K, V)) = {
+    new SMapPairFn[S, K, V] { def apply(s: S) = fn(s) }
   }
 
-  def flatMapFn[S, T](fn: S => Traversable[T]) = {
-    new SDoFn[S, T] { def apply(s: S) = fn(s) }
+  def pairFlatMapFn[S, K, V](fn: S => TraversableOnce[(K, V)]) = {
+    new SDoPairFn[S, K, V] { def apply(s: S) = fn(s) }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index c0162df..1e2e890 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -17,40 +17,238 @@
  */
 package org.apache.crunch.scrunch
 
-import org.apache.crunch.DoFn
-import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target}
+import org.apache.crunch.{PCollection => JCollection, Pair => JPair, _}
 import org.apache.crunch.types.{PType, PTableType}
 import org.apache.crunch.types.writable.WritableTypeFamily
-import scala.reflect.ClassTag
 
+/**
+ * Base trait for PCollection-like entities in Scrunch, including PTables and PGroupedTables.
+ *
+ * @tparam S the data type of the underlying object contained in this instance
+ * @tparam FullType the Scrunch PCollection type of this object
+ * @tparam NativeType the corresponding Crunch PCollection type of this object
+ */
 trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
+  type FunctionType[T]
+  type CtxtFunctionType[T]
+
+  protected def wrapFlatMapFn[T](fmt: FunctionType[TraversableOnce[T]]): DoFn[S, T]
+  protected def wrapMapFn[T](fmt: FunctionType[T]): MapFn[S, T]
+  protected def wrapFilterFn(fmt: FunctionType[Boolean]): FilterFn[S]
+  protected def wrapFlatMapWithCtxtFn[T](fmt: CtxtFunctionType[TraversableOnce[T]]): DoFn[S,
T]
+  protected def wrapMapWithCtxtFn[T](fmt: CtxtFunctionType[T]): MapFn[S, T]
+  protected def wrapFilterWithCtxtFn(fmt: CtxtFunctionType[Boolean]): FilterFn[S]
+  protected def wrapPairFlatMapFn[K, V](fmt: FunctionType[TraversableOnce[(K, V)]]): DoFn[S,
JPair[K, V]]
+  protected def wrapPairMapFn[K, V](fmt: FunctionType[(K, V)]): MapFn[S, JPair[K, V]]
+
+  /**
+   * Returns the underlying PCollection wrapped by this instance.
+   */
   val native: NativeType
 
-  def wrap(newNative: AnyRef): FullType
+  protected def wrap(newNative: JCollection[_]): FullType
 
+  /**
+   * Write the data in this instance to the given target.
+   */
   def write(target: Target): FullType = wrap(native.write(target))
 
+  /**
+   * Write the data in this instance to the given target with the given {@link Target.WriteMode}.
+   */
   def write(target: Target, writeMode: Target.WriteMode): FullType = {
     wrap(native.write(target, writeMode))
   }
 
+  /**
+   * Apply a flatMap operation to this instance, returning a {@code PTable} if the return
+   * type of the function is a {@code TraversableOnce[Tuple2]} and a {@code PCollection}
otherwise.
+   */
+  def flatMap[T, To](f: FunctionType[TraversableOnce[T]])
+                    (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, wrapFlatMapFn(f), pt.get(getTypeFamily()))
+  }
+
+  /**
+   * Apply a map operation to this instance, returning a {@code PTable} if the return
+   * type of the function is a {@code Tuple2} and a {@code PCollection} otherwise.
+   */
+  def map[T, To](f: FunctionType[T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]):
To = {
+    b(this, wrapMapFn(f), pt.get(getTypeFamily()))
+  }
+
+  /**
+   * Apply the given filter function to the elements of this instance and return an new
+   * instance that contains the items that pass the filter.
+   */
+  def filter(f: FunctionType[Boolean]): FullType = {
+    wrap(native.filter(wrapFilterFn(f)))
+  }
+
+  def filter(name: String, f: FunctionType[Boolean]): FullType = {
+    wrap(native.filter(name, wrapFilterFn(f)))
+  }
+
+  def flatMapWithContext[T, To](f: CtxtFunctionType[TraversableOnce[T]])
+                               (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]):
To = {
+    b(this, wrapFlatMapWithCtxtFn(f), pt.get(getTypeFamily()))
+  }
+
+  def mapWithContext[T, To](f: CtxtFunctionType[T])
+                           (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To =
{
+    b(this, wrapMapWithCtxtFn(f), pt.get(getTypeFamily()))
+  }
+
+  def filterWithContext(f: CtxtFunctionType[Boolean]): FullType = {
+    wrap(native.filter(wrapFilterWithCtxtFn(f)))
+  }
+
+  /**
+   * Applies the given doFn to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
   def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
     new PCollection[T](native.parallelDo(fn, ptype))
   }
-
+  /**
+   * Applies the given doFn to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
   def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T]) = {
     new PCollection[T](native.parallelDo(name, fn, ptype))
   }
 
+  /**
+   * Applies the given doFn to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T], opts: ParallelDoOptions)
= {
+    new PCollection[T](native.parallelDo(name, fn, ptype, opts))
+  }
+
+  /**
+   * Applies the given doFn to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
   def parallelDo[K, V](fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
     new PTable[K, V](native.parallelDo(fn, ptype))
   }
 
+  /**
+   * Applies the given doFn to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
   def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) =
{
     new PTable[K, V](native.parallelDo(name, fn, ptype))
   }
 
   /**
+   * Applies the given doFn to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V], opts:
ParallelDoOptions) = {
+    new PTable[K, V](native.parallelDo(name, fn, ptype, opts))
+  }
+
+  /**
+   * Applies the given flatMap function to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def flatMap[T](f: FunctionType[TraversableOnce[T]], ptype: PType[T]) = {
+    parallelDo(wrapFlatMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given flatMap function to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def flatMap[T](name: String, f: FunctionType[TraversableOnce[T]], ptype: PType[T]) = {
+    parallelDo(name, wrapFlatMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given flatMap function to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def flatMap[T](name: String, f: FunctionType[TraversableOnce[T]], ptype: PType[T], opts:
ParallelDoOptions) = {
+    parallelDo(name, wrapFlatMapFn(f), ptype, opts)
+  }
+
+  /**
+   * Applies the given flatMap function to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def flatMap[K, V](f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V]) =
{
+    parallelDo(wrapPairFlatMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given flatMap function to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def flatMap[K, V](name: String, f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K,
V]) = {
+    parallelDo(name, wrapPairFlatMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given flatMap function to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def flatMap[K, V](name: String, f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K,
V],
+                    opts: ParallelDoOptions) = {
+    parallelDo(name, wrapPairFlatMapFn(f), ptype, opts)
+  }
+
+  /**
+   * Applies the given map function to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def map[T](f: FunctionType[T], ptype: PType[T]) = {
+    parallelDo(wrapMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given map function to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def map[T](name: String, f: FunctionType[T], ptype: PType[T]) = {
+    parallelDo(name, wrapMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given map function to the elements of this instance and
+   * returns a new {@code PCollection} that is the output of this processing.
+   */
+  def map[T](name: String, f: FunctionType[T], ptype: PType[T], opts: ParallelDoOptions)
= {
+    parallelDo(name, wrapMapFn(f), ptype, opts)
+  }
+
+  /**
+   * Applies the given map function to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def map[K, V](f: FunctionType[(K, V)], ptype: PTableType[K, V]) = {
+    parallelDo(wrapPairMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given map function to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def map[K, V](name: String, f: FunctionType[(K, V)], ptype: PTableType[K, V]) = {
+    parallelDo(name, wrapPairMapFn(f), ptype)
+  }
+
+  /**
+   * Applies the given map function to the elements of this instance and
+   * returns a new {@code PTable} that is the output of this processing.
+   */
+  def map[K, V](name: String, f: FunctionType[(K, V)], ptype: PTableType[K, V],
+                opts: ParallelDoOptions) = {
+    parallelDo(name, wrapPairMapFn(f), ptype, opts)
+  }
+
+  /**
    * Gets the number of elements represented by this PCollection.
    *
    * @return The number of elements in this PCollection.
@@ -59,9 +257,22 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]]
{
     PObject(native.length())
   }
 
+  /**
+   * Returns a {@code PObject} containing the elements of this instance as a {@code Seq}.
+   * @return
+   */
   def asSeq(): PObject[Seq[S]] = {
     PObject(native.asCollection())
   }
 
-  def getTypeFamily() = Avros
+  /**
+   * Returns the {@code PTypeFamily} of this instance.
+   */
+  def getTypeFamily() = {
+    if (native.getTypeFamily == WritableTypeFamily.getInstance()) {
+      Writables
+    } else {
+      Avros
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
index 7d890de..c86611a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala
@@ -17,38 +17,43 @@
  */
 package org.apache.crunch.scrunch
 
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{CombineFn, PGroupedTable => JGroupedTable, PTable => JTable,
Pair => CPair}
+import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable,
Pair => CPair, _}
 import java.lang.{Iterable => JIterable}
-import scala.collection.{Iterable, Iterator}
+import scala.collection.Iterable
 import scala.collection.JavaConversions._
-import Conversions._
+import org.apache.hadoop.mapreduce.TaskInputOutputContext
 
 class PGroupedTable[K, V](val native: JGroupedTable[K, V])
     extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K,
V]] {
   import PGroupedTable._
 
-  def filter(f: (K, Iterable[V]) => Boolean) = {
-    parallelDo(filterFn[K, V](f), native.getPType())
-  }
-
-  def map[T, To](f: (K, Iterable[V]) => T)
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, mapFn(f), pt.get(getTypeFamily()))
-  }
+  type FunctionType[T] = (K, Iterable[V]) => T
+  type CtxtFunctionType[T] = (K, Iterable[V], TIOC) => T
 
-  def flatMap[T, To](f: (K, Iterable[V]) => Traversable[T])
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, flatMapFn(f), pt.get(getTypeFamily()))
+  protected def wrapFlatMapFn[T](fmt: (K, Iterable[V]) => TraversableOnce[T]) = flatMapFn(fmt)
+  protected def wrapMapFn[T](fmt: (K, Iterable[V]) => T) = mapFn(fmt)
+  protected def wrapFilterFn(fmt: (K, Iterable[V]) => Boolean) = filterFn(fmt)
+  protected def wrapFlatMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => TraversableOnce[T])
= {
+    flatMapWithCtxtFn(fmt)
   }
+  protected def wrapMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => T) = mapWithCtxtFn(fmt)
+  protected def wrapFilterWithCtxtFn(fmt: (K, Iterable[V], TIOC) => Boolean) = filterWithCtxtFn(fmt)
+  protected def wrapPairFlatMapFn[S, T](fmt: (K, Iterable[V]) => TraversableOnce[(S, T)])
= pairFlatMapFn(fmt)
+  protected def wrapPairMapFn[S, T](fmt: (K, Iterable[V]) => (S, T)) = pairMapFn(fmt)
 
   def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f))
 
+  def combineValues(agg: Aggregator[V]) = new PTable[K, V](native.combineValues(agg))
+
+  def combineValues(combineAgg: Aggregator[V], reduceAgg: Aggregator[V]) = {
+    new PTable[K, V](native.combineValues(combineAgg, reduceAgg))
+  }
+
   def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn))
 
   def ungroup() = new PTable[K, V](native.ungroup())
 
-  def wrap(newNative: AnyRef): PGroupedTable[K, V] = {
+  protected def wrap(newNative: JCollection[_]): PGroupedTable[K, V] = {
     new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]])
   }
 }
@@ -59,11 +64,7 @@ class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K,
V] {
   }
 }
 
-trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V],
Boolean] {
-  override def accept(input: CPair[K, JIterable[V]]) = apply(input.first(), iterableAsScalaIterable[V](input.second()))
-}
-
-trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V],
Traversable[T]] {
+trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V],
TraversableOnce[T]] {
   override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
     for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
       emitter.emit(v)
@@ -77,16 +78,84 @@ trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T]
with Funct
   }
 }
 
+trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V],
Boolean] {
+  override def accept(input: CPair[K, JIterable[V]]) = {
+    apply(input.first(), iterableAsScalaIterable[V](input.second()))
+  }
+}
+
+class SDoGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _,
_, _]) => TraversableOnce[T])
+  extends DoFn[CPair[K, JIterable[V]], T] {
+  override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
+    for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()), getContext))
{
+      emitter.emit(v)
+    }
+  }
+}
+
+class SMapGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _,
_, _]) => T)
+  extends MapFn[CPair[K, JIterable[V]], T] {
+  override def map(input: CPair[K, JIterable[V]]) = {
+    f(input.first(), iterableAsScalaIterable[V](input.second()), getContext)
+  }
+}
+
+class SFilterGroupedWithCtxtFn[K, V](val f: (K, Iterable[V], TaskInputOutputContext[_, _,
_, _]) => Boolean)
+  extends FilterFn[CPair[K, JIterable[V]]] {
+  override def accept(input: CPair[K, JIterable[V]]) = {
+    f.apply(input.first(), iterableAsScalaIterable[V](input.second()), getContext)
+  }
+}
+
+trait SDoPairGroupedFn[K, V, S, T] extends DoFn[CPair[K, JIterable[V]], CPair[S, T]]
+    with Function2[K, Iterable[V], TraversableOnce[(S, T)]] {
+  override def process(input: CPair[K, JIterable[V]], emitter: Emitter[CPair[S, T]]) {
+    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
+      emitter.emit(CPair.of(v._1, v._2))
+    }
+  }
+}
+
+trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]], CPair[S, T]]
+    with Function2[K, Iterable[V], (S, T)] {
+  override def map(input: CPair[K, JIterable[V]]) = {
+    val t = apply(input.first(), iterableAsScalaIterable[V](input.second()))
+    CPair.of(t._1, t._2)
+  }
+}
+
 object PGroupedTable {
-  def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
-    new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  type TIOC = TaskInputOutputContext[_, _, _, _]
+
+  def flatMapFn[K, V, T](fn: (K, Iterable[V]) => TraversableOnce[T]) = {
+    new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
   }
 
   def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = {
     new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
   }
 
-  def flatMapFn[K, V, T](fn: (K, Iterable[V]) => Traversable[T]) = {
-    new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
+    new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  }
+
+  def flatMapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => TraversableOnce[T]) = {
+    new SDoGroupedWithCtxtFn[K, V, T](fn)
+  }
+
+  def mapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => T) = {
+    new SMapGroupedWithCtxtFn[K, V, T](fn)
+  }
+
+  def filterWithCtxtFn[K, V](fn: (K, Iterable[V], TIOC) => Boolean) = {
+    new SFilterGroupedWithCtxtFn[K, V](fn)
+  }
+
+  def pairMapFn[K, V, S, T](fn: (K, Iterable[V]) => (S, T)) = {
+    new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  }
+
+  def pairFlatMapFn[K, V, S, T](fn: (K, Iterable[V]) => TraversableOnce[(S, T)]) = {
+    new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 646aded..2f88b0c 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -21,22 +21,33 @@ import java.util.{Collection => JCollect}
 
 import scala.collection.JavaConversions._
 
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
+import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair =>
CPair, _}
 import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables}
 import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType}
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
+import org.apache.crunch.types.{PTableType, PType}
+import scala.collection.Iterable
+import org.apache.hadoop.mapreduce.TaskInputOutputContext
+import org.apache.crunch.fn.IdentityFn
 
 class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K,
V], JTable[K, V]] {
   import PTable._
 
-  def filter(f: (K, V) => Boolean): PTable[K, V] = {
-    parallelDo(filterFn[K, V](f), native.getPTableType())
-  }
-
-  def map[T, To](f: (K, V) => T)
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, mapFn(f), pt.get(getTypeFamily()))
+  type FunctionType[T] = (K, V) => T
+  type CtxtFunctionType[T] = (K, V, TIOC) => T
+
+  protected def wrapFlatMapFn[T](fmt: (K, V) => TraversableOnce[T]) = flatMapFn(fmt)
+  protected def wrapMapFn[T](fmt: (K, V) => T) = mapFn(fmt)
+  protected def wrapFilterFn(fmt: (K, V) => Boolean) = filterFn(fmt)
+  protected def wrapFlatMapWithCtxtFn[T](fmt: (K, V, TIOC) => TraversableOnce[T]) = flatMapWithCtxtFn(fmt)
+  protected def wrapMapWithCtxtFn[T](fmt: (K, V, TIOC) => T) = mapWithCtxtFn(fmt)
+  protected def wrapFilterWithCtxtFn(fmt: (K, V, TIOC) => Boolean) = filterWithCtxtFn(fmt)
+  protected def wrapPairFlatMapFn[S, T](fmt: (K, V) => TraversableOnce[(S, T)]) = pairFlatMapFn(fmt)
+  protected def wrapPairMapFn[S, T](fmt: (K, V) => (S, T)) = pairMapFn(fmt)
+
+  def withPType(pt: PTableType[K, V]): PTable[K, V] = {
+    val ident: MapFn[CPair[K, V], CPair[K, V]]  = IdentityFn.getInstance()
+    wrap(native.parallelDo("withPType", ident, pt))
   }
 
   def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
@@ -45,15 +56,20 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
     parallelDo(mapValuesFn[K, V, T](f), ptype)
   }
 
+  def mapValues[T](f: V => T, pt: PType[T]) = {
+    val ptype = pt.getFamily().tableOf(native.getKeyType(), pt)
+    parallelDo(mapValuesFn[K, V, T](f), ptype)
+  }
+
   def mapKeys[T](f: K => T)(implicit pt: PTypeH[T]) = {
     val ptf = getTypeFamily()
     val ptype = ptf.tableOf(pt.get(ptf), native.getValueType())
     parallelDo(mapKeysFn[K, V, T](f), ptype)
   }
 
-  def flatMap[T, To](f: (K, V) => Traversable[T])
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, flatMapFn(f), pt.get(getTypeFamily()))
+  def mapKeys[T](f: K => T, pt: PType[T]) = {
+    val ptype = pt.getFamily.tableOf(pt, native.getValueType())
+    parallelDo(mapKeysFn[K, V, T](f), ptype)
   }
 
   def union(others: PTable[K, V]*) = {
@@ -123,12 +139,10 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
 
   def groupByKey(options: GroupingOptions) = new PGroupedTable(native.groupByKey(options))
 
-  def wrap(newNative: AnyRef) = {
+  protected def wrap(newNative: JCollection[_]) = {
     new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
   }
 
-  def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
-
   def materialize(): Iterable[(K, V)] = {
     InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
     native.materialize.view.map(x => (x.first, x.second))
@@ -143,25 +157,61 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K,
V]
     PObject(native.asMap())
   }
 
+  def pType() = native.getPTableType()
+
   def keyType() = native.getPTableType().getKeyType()
 
   def valueType() = native.getPTableType().getValueType()
 }
 
+trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, TraversableOnce[T]]
{
+  override def process(input: CPair[K, V], emitter: Emitter[T]) {
+    for (v <- apply(input.first(), input.second())) {
+      emitter.emit(v)
+    }
+  }
+}
+
+trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] {
+  override def map(input: CPair[K, V]) = apply(input.first(), input.second())
+}
+
 trait SFilterTableFn[K, V] extends FilterFn[CPair[K, V]] with Function2[K, V, Boolean] {
   override def accept(input: CPair[K, V]) = apply(input.first(), input.second())
 }
 
-trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, Traversable[T]]
{
+class SDoTableWithCtxtFn[K, V, T](val f: (K, V, TaskInputOutputContext[_, _, _, _]) =>
TraversableOnce[T])
+  extends DoFn[CPair[K, V], T] {
   override def process(input: CPair[K, V], emitter: Emitter[T]) {
-    for (v <- apply(input.first(), input.second())) {
+    for (v <- f(input.first(), input.second(), getContext)) {
       emitter.emit(v)
     }
   }
 }
 
-trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] {
-  override def map(input: CPair[K, V]) = apply(input.first(), input.second())
+class SMapTableWithCtxtFn[K, V, T](val f: (K, V, TaskInputOutputContext[_, _, _, _]) =>
T)
+  extends MapFn[CPair[K, V], T] {
+  override def map(input: CPair[K, V]) = f(input.first(), input.second(), getContext)
+}
+
+class SFilterTableWithCtxtFn[K, V](val f: (K, V, TaskInputOutputContext[_, _, _, _]) =>
Boolean)
+  extends FilterFn[CPair[K, V]] {
+  override def accept(input: CPair[K, V]) = f.apply(input.first(), input.second(), getContext)
+}
+
+trait SDoPairTableFn[K, V, S, T] extends DoFn[CPair[K, V], CPair[S, T]] with Function2[K,
V, TraversableOnce[(S, T)]] {
+  override def process(input: CPair[K, V], emitter: Emitter[CPair[S, T]]) {
+    for (v <- apply(input.first(), input.second())) {
+      emitter.emit(CPair.of(v._1, v._2))
+    }
+  }
+}
+
+trait SMapPairTableFn[K, V, S, T] extends MapFn[CPair[K, V], CPair[S, T]] with Function2[K,
V, (S, T)] {
+  override def map(input: CPair[K, V]) = {
+    val t = apply(input.first(), input.second())
+    CPair.of(t._1, t._2)
+  }
 }
 
 trait SMapTableValuesFn[K, V, T] extends MapFn[CPair[K, V], CPair[K, T]] with Function1[V,
T] {
@@ -173,10 +223,32 @@ trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]]
with Func
 }
 
 object PTable {
+  type TIOC = TaskInputOutputContext[_, _, _, _]
+
+  def flatMapFn[K, V, T](fn: (K, V) => TraversableOnce[T]) = {
+    new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
+  }
+
+  def mapFn[K, V, T](fn: (K, V) => T) = {
+    new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
+  }
+
   def filterFn[K, V](fn: (K, V) => Boolean) = {
     new SFilterTableFn[K, V] { def apply(k: K, v: V) = fn(k, v) }
   }
 
+  def flatMapWithCtxtFn[K, V, T](fn: (K, V, TIOC) => TraversableOnce[T]) = {
+    new SDoTableWithCtxtFn[K, V, T](fn)
+  }
+
+  def mapWithCtxtFn[K, V, T](fn: (K, V, TIOC) => T) = {
+    new SMapTableWithCtxtFn[K, V, T](fn)
+  }
+
+  def filterWithCtxtFn[K, V](fn: (K, V, TIOC) => Boolean) = {
+    new SFilterTableWithCtxtFn[K, V](fn)
+  }
+
   def mapValuesFn[K, V, T](fn: V => T) = {
     new SMapTableValuesFn[K, V, T] { def apply(v: V) = fn(v) }
   }
@@ -185,11 +257,12 @@ object PTable {
     new SMapTableKeysFn[K, V, T] { def apply(k: K) = fn(k) }
   }
 
-  def mapFn[K, V, T](fn: (K, V) => T) = {
-    new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
+  def pairMapFn[K, V, S, T](fn: (K, V) => (S, T)) = {
+    new SMapPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) }
   }
 
-  def flatMapFn[K, V, T](fn: (K, V) => Traversable[T]) = {
-    new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
+  def pairFlatMapFn[K, V, S, T](fn: (K, V) => TraversableOnce[(S, T)]) = {
+    new SDoPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
index d250e9b..67c9b14 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
@@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.MRPipeline
 import org.apache.crunch.util.DistCache
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 import scala.reflect.ClassTag
+import org.apache.crunch.types.{PTableType, PType}
 
 /**
  * Manages the state of a pipeline execution.

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c2e5b0f/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index c183e5e..27c43a7 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -19,11 +19,9 @@ package org.apache.crunch.scrunch
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.crunch.{Pipeline => JPipeline}
-import org.apache.crunch.Source
-import org.apache.crunch.TableSource
-import org.apache.crunch.Target
+import org.apache.crunch.{Pipeline => JPipeline, _}
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
+import org.apache.crunch.types.{PTableType, PType}
 
 trait PipelineLike {
   def jpipeline: JPipeline
@@ -34,6 +32,18 @@ trait PipelineLike {
   def getConfiguration(): Configuration = jpipeline.getConfiguration()
 
   /**
+   * Sets the configuration object associated with this pipeline.
+   */
+  def setConfiguration(conf: Configuration) {
+    jpipeline.setConfiguration(conf)
+  }
+
+  /**
+   * Returns the name of this pipeline instance.
+   */
+  def getName() = jpipeline.getName()
+
+  /**
    * Reads a source into a [[org.apache.crunch.scrunch.PCollection]]
    *
    * @param source The source to read from.
@@ -91,10 +101,29 @@ trait PipelineLike {
   }
 
   /**
+   * Creates an empty PCollection of the given PType.
+   */
+  def emptyPCollection[T](pt: PType[T]) = new PCollection[T](jpipeline.emptyPCollection(pt))
+
+  /**
+   * Creates an empty PTable of the given PTableType.
+   */
+  def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt))
+
+  /**
+   * Returns a handler for controlling the execution of the underlying MapReduce
+   * pipeline.
+   */
+  def runAsync(): PipelineExecution = {
+    InterpreterRunner.addReplJarsToJob(getConfiguration())
+    jpipeline.runAsync()
+  }
+
+  /**
    * Constructs and executes a series of MapReduce jobs in order
    * to write data to the output targets.
    */
-  def run(): Unit = {
+  def run(): PipelineResult = {
     InterpreterRunner.addReplJarsToJob(getConfiguration())
     jpipeline.run()
   }
@@ -104,12 +133,21 @@ trait PipelineLike {
    * clean up any intermediate data files that were created in
    * this run or previous calls to `run`.
    */
-  def done(): Unit =  {
+  def done(): PipelineResult =  {
     InterpreterRunner.addReplJarsToJob(getConfiguration())
     jpipeline.done()
   }
 
   /**
+   * Cleans up any artifacts created as a result of {@link #run() running} the pipeline.
+   *
+   * @param force forces the cleanup even if all targets of the pipeline have not been completed.
+   */
+  def cleanup(force: Boolean): Unit = {
+    jpipeline.cleanup(force)
+  }
+
+  /**
    * Turn on debug logging for jobs that are run from this pipeline.
    */
   def debug(): Unit = jpipeline.enableDebug()


Mime
View raw message