incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [9/10] CRUNCH-32: Clean up namespaces.
Date Wed, 08 Aug 2012 00:47:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/resources/urls.txt
----------------------------------------------------------------------
diff --git a/scrunch/src/it/resources/urls.txt b/scrunch/src/it/resources/urls.txt
deleted file mode 100644
index 827e711..0000000
--- a/scrunch/src/it/resources/urls.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-www.A.com	www.B.com
-www.A.com	www.C.com
-www.A.com	www.D.com
-www.A.com	www.E.com
-www.B.com	www.D.com
-www.B.com	www.E.com
-www.C.com	www.D.com
-www.D.com	www.B.com
-www.E.com	www.A.com
-www.F.com	www.B.com
-www.F.com	www.C.com

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala b/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
deleted file mode 100644
index de20cd9..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class CogroupTest extends CrunchTestSupport with JUnitSuite {
-  lazy val pipeline = Pipeline.mapReduce[CogroupTest](tempDir.getDefaultConfiguration)
-
-  def wordCount(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-        .flatMap(_.toLowerCase.split("\\W+")).count
-  }
-
-  @Test def cogroup {
-    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
-    val maugham = tempDir.copyResourceFileName("maugham.txt")
-    val diffs = wordCount(shakespeare).cogroup(wordCount(maugham))
-        .map((k, v) => (k, (v._1.sum - v._2.sum))).materialize
-    assert(diffs.exists(_ == ("the", -11390)))
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala b/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
deleted file mode 100644
index 397ca65..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class JoinTest extends CrunchTestSupport with JUnitSuite {
-  lazy val pipeline = Pipeline.mapReduce[CogroupTest](tempDir.getDefaultConfiguration)
-
-  def wordCount(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-        .flatMap(_.toLowerCase.split("\\W+")).count
-  }
-
-  @Test def join {
-    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
-    val maugham = tempDir.copyResourceFileName("maugham.txt")
-    val output = tempDir.getFile("output")
-    val filtered = wordCount(shakespeare).join(wordCount(maugham))
-        .map((k, v) => (k, v._1 - v._2))
-        .write(to.textFile(output.getAbsolutePath()))
-        .filter((k, d) => d > 0).materialize
-    assert(filtered.exists(_ == ("macbeth", 66)))
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
deleted file mode 100644
index d2822db..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import Avros._
-
-import org.apache.crunch.{DoFn, Emitter, Pair => P}
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Assert._
-import _root_.org.junit.Test
-
-case class PageRankData(pr: Float, oldpr: Float, urls: Array[String]) {
-  def this() = this(0f, 0f, null)
-
-  def scaledPageRank = pr / urls.length
-
-  def next(newPageRank: Float) = new PageRankData(newPageRank, pr, urls)
-
-  def delta = math.abs(pr - oldpr)
-}
-
-class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] {
-  val cache = new HashMap[String, Float] {
-    override def default(key: String) = 0f
-  }
-
-  override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]]) {
-    val prd = input.second()
-    if (prd.urls.length > 0) {
-      val newpr = prd.pr / prd.urls.length
-      prd.urls.foreach(url => cache.put(url, cache(url) + newpr))
-      if (cache.size > 5000) {
-        cleanup(emitFn)
-      }
-    }
-  }
-
-  override def cleanup(emitFn: Emitter[P[String, Float]]) {
-    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
-    cache.clear
-  }
-}
-
-class PageRankClassTest extends CrunchTestSupport with JUnitSuite {
-
-  lazy val pipeline = Pipeline.mapReduce[PageRankTest](tempDir.getDefaultConfiguration)
-
-  def initialInput(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
-      .groupByKey
-      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
-  }
-
-  def update(prev: PTable[String, PageRankData], d: Float) = {
-    val outbound = prev.flatMap((url, prd) => {
-      prd.urls.map(link => (link, prd.scaledPageRank))
-    })
-    cg(prev, outbound, d)
-  }
-
-  def cg(prev: PTable[String, PageRankData],
-         out: PTable[String, Float], d: Float) = {
-    prev.cogroup(out).map((url, v) => {
-      val (p, o) = v
-      val prd = p.head
-      (url, prd.next((1 - d) + d * o.sum))
-    })
-  }
-
-  def fastUpdate(prev: PTable[String, PageRankData], d: Float) = {
-    val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats))
-    cg(prev, outbound, d)
-  }
-
-  @Test def testPageRank {
-    pipeline.getConfiguration.set("crunch.debug", "true")
-    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = update(prev, 0.5f)
-      delta = prev.values.map(_.delta).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-
-  def testFastPageRank {
-    pipeline.getConfiguration.set("crunch.debug", "true")
-    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = fastUpdate(prev, 0.5f)
-      delta = prev.values.map(_.delta).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
deleted file mode 100644
index 9cf05e1..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import Avros._
-
-import org.apache.crunch.{DoFn, Emitter, Pair => P}
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Assert._
-import _root_.org.junit.Test
-
-class CachingPageRankFn extends DoFn[P[String, (Float, Float, List[String])], P[String, Float]] {
-  val cache = new HashMap[String, Float] {
-    override def default(key: String) = 0f
-  }
-
-  override def process(input: P[String, (Float, Float, List[String])], emitFn: Emitter[P[String, Float]]) {
-    val (pr, oldpr, urls) = input.second()
-    val newpr = pr / urls.size
-    urls.foreach(url => cache.put(url, cache(url) + newpr))
-    if (cache.size > 5000) {
-      cleanup(emitFn)
-    }
-  }
-
-  override def cleanup(emitFn: Emitter[P[String, Float]]) {
-    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
-    cache.clear
-  }
-}
-
-class PageRankTest extends CrunchTestSupport with JUnitSuite {
-  lazy val pipeline = Pipeline.mapReduce[PageRankTest](tempDir.getDefaultConfiguration)
-
-  def initialInput(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
-      .groupByKey
-      .map((url, links) => (url, (1f, 0f, links.toList)))
-  }
-
-  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
-    val outbound = prev.flatMap((url, v) => {
-      val (pr, oldpr, links) = v
-      links.map(link => (link, pr / links.size))
-    })
-    cg(prev, outbound, d)
-  }
-
-  def cg(prev: PTable[String, (Float, Float, List[String])],
-         out: PTable[String, Float], d: Float) = {
-    prev.cogroup(out).map((url, v) => {
-      val (p, o) = v
-      val (pr, oldpr, links) = p.head
-      (url, ((1 - d) + d * o.sum, pr, links))
-    })
-  }
-
-  def fastUpdate(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
-    val outbound = prev.parallelDo(new CachingPageRankFn(), tableOf(strings, floats))
-    cg(prev, outbound, d)
-  }
-
-  @Test def testPageRank {
-    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = update(prev, 0.5f)
-      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-
-  @Test def testFastPageRank {
-    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = fastUpdate(prev, 0.5f)
-      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
deleted file mode 100644
index 50d6fc1..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-object WordCount extends PipelineApp {
-
-  def wordSplit(line: String) = line.split("\\W+").filter(!_.isEmpty())
-
-  def countWords(filename: String) = {
-    val lines = read(from.textFile(filename))
-    val words = lines.flatMap(wordSplit)
-    words.count
-  }
-
-  val w1 = countWords(args(0))
-  val w2 = countWords(args(1))
-  cogroup(w1, w2).write(to.textFile(args(2)))
-}
-
-class PipelineAppTest extends CrunchTestSupport with JUnitSuite {
-  @Test def run {
-    val args = new Array[String](3)
-    args(0) = tempDir.copyResourceFileName("shakes.txt")
-    args(1) = tempDir.copyResourceFileName("maugham.txt")
-    args(2) = tempDir.getFileName("output")
-    tempDir.overridePathProperties(WordCount.configuration)
-    WordCount.main(args)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala b/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
deleted file mode 100644
index f9db0e5..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class TopTest extends CrunchTestSupport with JUnitSuite {
-
-  @Test def topInMem {
-    val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729))
-    assert(ptable.top(1, true).materialize.head == ("baz", 1729))
-  }
-
-  @Test def top2 {
-    val pipeline = Pipeline.mapReduce[TopTest](tempDir.getDefaultConfiguration)
-    val input = tempDir.copyResourceFileName("shakes.txt")
-
-    val wc = pipeline.read(from.textFile(input))
-        .flatMap(_.toLowerCase.split("\\s+"))
-        .filter(!_.isEmpty()).count
-    assert(wc.top(10, true).materialize.exists(_ == ("is", 205)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala b/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
deleted file mode 100644
index dd0a651..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class UnionTest extends CrunchTestSupport with JUnitSuite {
-  lazy val pipeline = Pipeline.mapReduce[UnionTest](tempDir.getDefaultConfiguration)
-
-  def wordCount(col: PCollection[String]) = {
-    col.flatMap(_.toLowerCase.split("\\W+")).count
-  }
-
-  @Test def testUnionCollection {
-    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
-    val maugham = tempDir.copyResourceFileName("maugham.txt")
-    val union = pipeline.read(from.textFile(shakespeare)).union(
-        pipeline.read(from.textFile(maugham)))
-    val wc = wordCount(union).materialize
-    assert(wc.exists(_ == ("you", 3691)))
-    pipeline.done
-  }
-
-  @Test def testUnionTable {
-    val shakespeare = tempDir.copyResourceFileName("shakes.txt")
-    val maugham = tempDir.copyResourceFileName("maugham.txt")
-    val wcs = wordCount(pipeline.read(from.textFile(shakespeare)))
-    val wcm = wordCount(pipeline.read(from.textFile(maugham)))
-    val wc = wcs.union(wcm).groupByKey.combine(v => v.sum).materialize
-    assert(wc.exists(_ == ("you", 3691)))
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala b/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
deleted file mode 100644
index 3edb08b..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class WordCountTest extends CrunchTestSupport with JUnitSuite {
-  @Test def wordCount {
-    val pipeline = Pipeline.mapReduce[WordCountTest](tempDir.getDefaultConfiguration)
-    val input = tempDir.copyResourceFileName("shakes.txt")
-    val wordCountOut = tempDir.getFileName("output")
-
-    val fcc = pipeline.read(from.textFile(input))
-        .flatMap(_.toLowerCase.split("\\s+"))
-        .filter(!_.isEmpty()).count
-        .write(to.textFile(wordCountOut)) // Word counts
-        .map((w, c) => (w.slice(0, 1), c))
-        .groupByKey.combine(v => v.sum).materialize
-    assert(fcc.exists(_ == ("w", 1404)))
-
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala b/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
deleted file mode 100644
index 519dae4..0000000
--- a/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch.interpreter
-
-import java.io.File
-import java.io.FileOutputStream
-import java.util.jar.JarFile
-import java.util.jar.JarOutputStream
-
-import scala.tools.nsc.io.VirtualDirectory
-
-import com.google.common.io.Files
-import org.junit.Assert.assertNotNull
-import org.junit.Test
-import org.apache.crunch.test.CrunchTestSupport
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
- */
-class InterpreterJarTest extends CrunchTestSupport with JUnitSuite {
-
-  /**
-   * Tests transforming a virtual directory into a temporary jar file.
-   */
-  @Test def virtualDirToJar: Unit = {
-    // Create a virtual directory and populate with some mock content.
-    val root = new VirtualDirectory("testDir", None)
-    // Add some subdirectories to the root.
-    (1 to 10).foreach { i =>
-      val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
-      // Add some classfiles to each sub directory.
-      (1 to 10).foreach { j =>
-        subdir.fileNamed("MyClass" + j + ".class")
-      }
-    }
-
-    // Now generate a jar file from the virtual directory.
-    val tempJar = new File(tempDir.getRootFile(), "replJar.jar")
-    val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
-    InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
-    jarStream.close()
-
-    // Verify the contents of the jar.
-    val jarFile = new JarFile(tempJar)
-    (1 to 10).foreach { i =>
-      (1 to 10).foreach { j =>
-        val entryName = "top/pack/name/subdir" + i + "/MyClass" + j + ".class"
-        val entry = jarFile.getEntry(entryName)
-        assertNotNull("Jar entry " + entryName + " not found in generated jar.", entry)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/assembly/release.xml
----------------------------------------------------------------------
diff --git a/scrunch/src/main/assembly/release.xml b/scrunch/src/main/assembly/release.xml
deleted file mode 100644
index e740f32..0000000
--- a/scrunch/src/main/assembly/release.xml
+++ /dev/null
@@ -1,93 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<!--
-   Assembly configuration for the release bundle.
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>release</id>
-  <formats>
-    <format>dir</format>
-    <format>tar.gz</format>
-  </formats>
-  <includeBaseDirectory>true</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <!-- readme -->
-      <useDefaultExcludes>false</useDefaultExcludes>
-      <outputDirectory>/</outputDirectory>
-      <fileMode>0644</fileMode>
-      <includes>
-        <include>README.md</include>
-      </includes>
-      <filtered>true</filtered>
-    </fileSet>
-    <fileSet>
-      <!-- scripts -->
-      <useDefaultExcludes>false</useDefaultExcludes>
-      <outputDirectory>bin</outputDirectory>
-      <directory>src/main/scripts</directory>
-      <fileMode>0755</fileMode>
-      <excludes>
-        <exclude>*~</exclude>
-        <exclude>*.swp</exclude>
-      </excludes>
-      <filtered>true</filtered>
-    </fileSet>
-    <fileSet>
-      <!-- conf dir -->
-      <useDefaultExcludes>false</useDefaultExcludes>
-      <outputDirectory>conf</outputDirectory>
-      <directory>src/main/conf</directory>
-      <fileMode>0644</fileMode>
-      <excludes>
-        <exclude>*~</exclude>
-        <exclude>*.swp</exclude>
-      </excludes>
-      <filtered>true</filtered>
-    </fileSet>
-    <fileSet>
-      <!-- examples dir -->
-      <useDefaultExcludes>false</useDefaultExcludes>
-      <outputDirectory>examples</outputDirectory>
-      <directory>src/main/examples</directory>
-      <fileMode>0644</fileMode>
-      <excludes>
-        <exclude>*~</exclude>
-        <exclude>*.swp</exclude>
-      </excludes>
-      <filtered>true</filtered>
-    </fileSet>
-  </fileSets>
-  <dependencySets>
-    <dependencySet>
-      <outputDirectory>lib</outputDirectory>
-      <scope>runtime</scope>
-      <useTransitiveFiltering>true</useTransitiveFiltering>
-      <fileMode>0644</fileMode>
-      <!--
-      <excludes>
-        <exclude>org.apache.hadoop:hadoop-core</exclude>
-        <exclude>org.apache.hbase:hbase</exclude>
-        </excludes>
-      -->
-    </dependencySet>
-  </dependencySets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/scrunch/src/main/conf/log4j.properties b/scrunch/src/main/conf/log4j.properties
deleted file mode 100644
index 448bb77..0000000
--- a/scrunch/src/main/conf/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# ***** Set root logger level to INFO and its only appender to A.
-log4j.logger.org.apache.scrunch=info, A
-
-# ***** A is set to be a ConsoleAppender.
-log4j.appender.A=org.apache.log4j.ConsoleAppender
-# ***** A uses PatternLayout.
-log4j.appender.A.layout=org.apache.log4j.PatternLayout
-log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/examples/ClassyPageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/ClassyPageRank.scala b/scrunch/src/main/examples/ClassyPageRank.scala
deleted file mode 100644
index 6c819a5..0000000
--- a/scrunch/src/main/examples/ClassyPageRank.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-import org.apache.scrunch.Mem._
-
-case class UrlData(pageRank: Float, oldPageRank: Float, links: List[String]) {
-  def this() = this(1.0f, 0.0f, Nil)
-
-  def this(links: String*) = this(1.0f, 0.0f, List(links:_*))
- 
-  def this(links: Iterable[String]) = this(1.0f, 0.0f, links.toList)
-  
-  def delta = math.abs(pageRank - oldPageRank)
-
-  def next(newPageRank: Float) = new UrlData(newPageRank, pageRank, links)
-
-  def outboundScores = links.map(link => (link, pageRank / links.size))
-}
-
-object ClassyPageRank extends PipelineApp {
-
-  def initialize(file: String) = {
-    read(from.textFile(file))
-      .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
-      .groupByKey
-      .map((url, links) => (url, new UrlData(links)))
-  }
-
-  def update(prev: PTable[String, UrlData], d: Float) = {
-    val outbound = prev.values.flatMap(_.outboundScores)
-
-    cogroup(prev, outbound).mapValues(data => {
-      val (prd, outboundScores) = data
-      val newPageRank = (1 - d) + d * outboundScores.sum
-      if (!prd.isEmpty) {
-        prd.head.next(newPageRank)
-      } else {
-        new UrlData(newPageRank, 0, Nil)
-      }
-    })
-  }
-
-  var index = 0
-  var delta = 10.0f
-  fs.mkdirs("prank/")
-  var curr = initialize(args(0))
-  while (delta > 1.0f) {
-    index = index + 1
-    curr = update(curr, 0.5f)
-    write(curr, to.avroFile("prank/" + index))
-    delta = curr.values.map(_.delta).max.materialize.head
-    println("Current delta = " + delta)
-  }
-  fs.rename("prank/" + index, args(1))
-  fs.delete("prank/", true)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/examples/PageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/PageRank.scala b/scrunch/src/main/examples/PageRank.scala
deleted file mode 100644
index 7de26e6..0000000
--- a/scrunch/src/main/examples/PageRank.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-
-object PageRank extends PipelineApp {
-  def initialize(file: String) = {
-    read(from.textFile(file))
-      .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
-      .groupByKey
-      .map((url, links) => (url, (1f, 0f, links.toList)))
-  }
-
-  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
-    val outbound = prev.flatMap((url, data) => {
-      val (pagerank, old_pagerank, links) = data
-      links.map(link => (link, pagerank / links.size))
-    })
-
-    cogroup(prev, outbound).mapValues(data => {
-      val (prev_data, outbound_data) = data
-      val new_pagerank = (1 - d) + d * outbound_data.sum
-      var cur_pagerank = 0f
-      var links: List[String] = Nil
-      if (!prev_data.isEmpty) {
-        val (cur_pr, old_pr, l) = prev_data.head
-        cur_pagerank = cur_pr
-        links = l
-      }
-      (new_pagerank, cur_pagerank, links)
-    })
-  }
-
-  var index = 0
-  var delta = 10.0f
-  fs.mkdirs("prank/")
-  var curr = initialize(args(0))
-  while (delta > 1.0f) {
-    index = index + 1
-    curr = update(curr, 0.5f)
-    write(curr, to.avroFile("prank/" + index))
-    delta = curr.values.map(v => math.abs(v._1 - v._2)).max.materialize.head
-    println("Current delta = " + delta)
-  }
-  fs.rename("prank/" + index, args(1))
-  fs.delete("prank/", true)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/examples/WordCount.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/WordCount.scala b/scrunch/src/main/examples/WordCount.scala
deleted file mode 100644
index 10780e8..0000000
--- a/scrunch/src/main/examples/WordCount.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-import org.apache.scrunch.PipelineApp
-
-object WordCount extends PipelineApp {
-
-  def countWords(file: String) = {
-    read(from.textFile(file))
-      .flatMap(_.split("\\W+").filter(!_.isEmpty()))
-      .count
-  }
-
-  val counts = join(countWords(args(0)), countWords(args(1)))
-  write(counts, to.textFile(args(2)))
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java b/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java
deleted file mode 100644
index 0180585..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaReflectDataFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import org.apache.crunch.types.avro.ReflectDataFactory;
-
-/**
- * An implementation of the {@code ReflectDataFactory} class to work with Scala classes.
- */
-public class ScalaReflectDataFactory extends ReflectDataFactory {
-
-  public ReflectData getReflectData() { return ScalaSafeReflectData.get(); }
-  
-  public <T> ReflectDatumReader<T> getReader(Schema schema) {
-    return new ScalaSafeReflectDatumReader<T>(schema);
-  }
-  
-  public <T> ReflectDatumWriter<T> getWriter() {
-    return new ScalaSafeReflectDatumWriter<T>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java b/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java
deleted file mode 100644
index 55bacda..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectData.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.Stringable;
-import org.apache.avro.reflect.Union;
-import org.apache.avro.specific.FixedSize;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.node.NullNode;
-
-/**
- * Scala-oriented support class for serialization via reflection.
- */
-public class ScalaSafeReflectData extends ReflectData.AllowNull {
-
-  private static final ScalaSafeReflectData INSTANCE = new ScalaSafeReflectData();
-  
-  public static ScalaSafeReflectData get() { return INSTANCE; }
-  
-  static final String CLASS_PROP = "java-class";
-  static final String ELEMENT_PROP = "java-element-class";
-  
-  static Class getClassProp(Schema schema, String prop) {
-    String name = schema.getProp(prop);
-    if (name == null) return null;
-    try {
-      return Class.forName(name);
-    } catch (ClassNotFoundException e) {
-      throw new AvroRuntimeException(e);
-    }
-  }
-  
-  /**
-   * This method is the whole reason for this class to exist, so that I can
-   * hack around a problem where calling getSimpleName on a class that is
-   * defined inside of the Scala REPL can cause an internal language error,
-   * which I'm not a huge fan of.
-   * 
-   * @param clazz
-   * @return
-   */
-  private String getSimpleName(Class clazz) {
-    try {
-      return clean(clazz.getSimpleName());
-    } catch (InternalError ie) {
-      // This can happen in Scala when we're using the Console. Crazy, right?
-      String fullName = clazz.getName();
-      String[] pieces = fullName.split("\\.");
-      return clean(pieces[pieces.length - 1]);
-    }
-  }
-  
-  @Override
-  @SuppressWarnings(value="unchecked")
-  protected Schema createSchema(Type type, Map<String,Schema> names) {
-    if (type instanceof GenericArrayType) {                  // generic array
-      Type component = ((GenericArrayType)type).getGenericComponentType();
-      if (component == Byte.TYPE)                            // byte array
-        return Schema.create(Schema.Type.BYTES);           
-      Schema result = Schema.createArray(createSchema(component, names));
-      setElement(result, component);
-      return result;
-    } else if (type instanceof ParameterizedType) {
-      ParameterizedType ptype = (ParameterizedType)type;
-      Class raw = (Class)ptype.getRawType();
-      Type[] params = ptype.getActualTypeArguments();
-      if (java.util.Map.class.isAssignableFrom(raw) ||
-          scala.collection.Map.class.isAssignableFrom(raw)) {
-        Type key = params[0];
-        Type value = params[1];
-        if (!(key == String.class))
-          throw new AvroTypeException("Map key class not String: "+key);
-        Schema schema = Schema.createMap(createSchema(value, names));
-        schema.addProp(CLASS_PROP, raw.getName());
-        return schema;
-      } else if (Collection.class.isAssignableFrom(raw) ||
-          scala.collection.Iterable.class.isAssignableFrom(raw)) {   // Collection
-        if (params.length != 1)
-          throw new AvroTypeException("No array type specified.");
-        Schema schema = Schema.createArray(createSchema(params[0], names));
-        schema.addProp(CLASS_PROP, raw.getName());
-        return schema;
-      } else {
-        throw new AvroTypeException("Could not convert type: " + type);
-      }
-    } else if ((type == Short.class) || (type == Short.TYPE)) {
-      Schema result = Schema.create(Schema.Type.INT);
-      result.addProp(CLASS_PROP, Short.class.getName());
-      return result;
-    } else if (type instanceof Class) {                      // Class
-      Class<?> c = (Class<?>)type;
-      if (c.isPrimitive() || Number.class.isAssignableFrom(c)
-          || c == Void.class || c == Boolean.class)          // primitive
-        return super.createSchema(type, names);
-      if (c.isArray()) {                                     // array
-        Class component = c.getComponentType();
-        if (component == Byte.TYPE)                          // byte array
-          return Schema.create(Schema.Type.BYTES);
-        Schema result = Schema.createArray(createSchema(component, names));
-        setElement(result, component);
-        return result;
-      }
-      if (CharSequence.class.isAssignableFrom(c))            // String
-        return Schema.create(Schema.Type.STRING);
-      String fullName = c.getName();
-      Schema schema = names.get(fullName);
-      if (schema == null) {
-        String name = getSimpleName(c);
-        String space = c.getPackage() == null ? "" : c.getPackage().getName();
-        if (c.getEnclosingClass() != null)                   // nested class
-          space = c.getEnclosingClass().getName() + "$";
-        Union union = c.getAnnotation(Union.class);
-        if (union != null) {                                 // union annotated
-          return getAnnotatedUnion(union, names);
-        } else if (c.isAnnotationPresent(Stringable.class)){ // Stringable
-          Schema result = Schema.create(Schema.Type.STRING);
-          result.addProp(CLASS_PROP, c.getName());
-          return result;
-        } else if (c.isEnum()) {                             // Enum
-          List<String> symbols = new ArrayList<String>();
-          Enum[] constants = (Enum[])c.getEnumConstants();
-          for (int i = 0; i < constants.length; i++)
-            symbols.add(constants[i].name());
-          schema = Schema.createEnum(name, null /* doc */, space, symbols);
-        } else if (GenericFixed.class.isAssignableFrom(c)) { // fixed
-          int size = c.getAnnotation(FixedSize.class).value();
-          schema = Schema.createFixed(name, null /* doc */, space, size);
-        } else if (IndexedRecord.class.isAssignableFrom(c)) { // specific
-          return super.createSchema(type, names);
-        } else {                                             // record
-          List<Schema.Field> fields = new ArrayList<Schema.Field>();
-          boolean error = Throwable.class.isAssignableFrom(c);
-          schema = Schema.createRecord(name, null /* doc */, space, error);
-          names.put(c.getName(), schema);
-          for (Field field : getFields(c))
-            if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0){
-              Schema fieldSchema = createFieldSchema(field, names);
-              JsonNode defaultValue = null;
-              if (fieldSchema.getType() == Schema.Type.UNION) {
-                Schema defaultType = fieldSchema.getTypes().get(0);
-                if (defaultType.getType() == Schema.Type.NULL) {
-                  defaultValue = NullNode.getInstance();
-                }
-              }
-              fields.add(new Schema.Field(clean(field.getName()),
-                  fieldSchema, null /* doc */, defaultValue));
-            }
-          if (error)                              // add Throwable message
-            fields.add(new Schema.Field("detailMessage", THROWABLE_MESSAGE,
-                                        null, null));
-          schema.setFields(fields);
-        }
-        names.put(fullName, schema);
-      }
-      return schema;
-    }
-    return super.createSchema(type, names);
-  }
-  
-  private static final Schema THROWABLE_MESSAGE =
-      makeNullable(Schema.create(Schema.Type.STRING));
-  
-
-  @Override
-  public Object getField(Object record, String name, int position) {
-    if (record instanceof IndexedRecord)
-      return super.getField(record, name, position);
-    try {
-      return getField(record.getClass(), name).get(record);
-    } catch (IllegalAccessException e) {
-      throw new AvroRuntimeException(e);
-    }
-  }
-  
-  private static final Map<Class,Map<String,Field>> FIELD_CACHE =
-      new ConcurrentHashMap<Class,Map<String,Field>>();
-  
-  private static Field getField(Class c, String name) {
-    Map<String,Field> fields = FIELD_CACHE.get(c);
-    if (fields == null) {
-      fields = new ConcurrentHashMap<String,Field>();
-      FIELD_CACHE.put(c, fields);
-    }
-    Field f = fields.get(name);
-    if (f == null) {
-      f = findField(c, name);
-      fields.put(name, f);
-    }
-    return f;
-  }
-
-  private static Field findField(Class original, String name) {
-    Class c = original;
-    do {
-      try {
-        Field f = c.getDeclaredField(dirty(name));
-        f.setAccessible(true);
-        return f;
-      } catch (NoSuchFieldException e) {}
-      c = c.getSuperclass();
-    } while (c != null);
-    throw new AvroRuntimeException("No field named "+name+" in: "+original);
-  }
-  
-  private static String clean(String dirty) {
-    return dirty.replace('$', '_');
-  }
-  
-  private static String dirty(String clean) {
-    return clean.replace('_', '$');
-  }
-  
-  // Return of this class and its superclasses to serialize.
-  // Not cached, since this is only used to create schemas, which are cached.
-  private Collection<Field> getFields(Class recordClass) {
-    Map<String,Field> fields = new LinkedHashMap<String,Field>();
-    Class c = recordClass;
-    do {
-      if (c.getPackage() != null
-          && c.getPackage().getName().startsWith("java."))
-        break;                                    // skip java built-in classes
-      for (Field field : c.getDeclaredFields())
-        if ((field.getModifiers() & (Modifier.TRANSIENT|Modifier.STATIC)) == 0)
-          if (fields.put(field.getName(), field) != null)
-            throw new AvroTypeException(c+" contains two fields named: "+field);
-      c = c.getSuperclass();
-    } while (c != null);
-    return fields.values();
-  }
-  
-  @SuppressWarnings(value="unchecked")
-  private void setElement(Schema schema, Type element) {
-    if (!(element instanceof Class)) return;
-    Class<?> c = (Class<?>)element;
-    Union union = c.getAnnotation(Union.class);
-    if (union != null)                          // element is annotated union
-      schema.addProp(ELEMENT_PROP, c.getName());
-  }
-  
-  // construct a schema from a union annotation
-  private Schema getAnnotatedUnion(Union union, Map<String,Schema> names) {
-    List<Schema> branches = new ArrayList<Schema>();
-    for (Class branch : union.value())
-      branches.add(createSchema(branch, names));
-    return Schema.createUnion(branches);
-  }
-  
-  @Override
-  protected boolean isArray(Object datum) {
-    if (datum == null) return false;
-    return (datum instanceof Collection) || datum.getClass().isArray() ||
-        (datum instanceof scala.collection.Iterable);
-  }
-  
-  @Override
-  protected boolean isMap(Object datum) {
-    return (datum instanceof java.util.Map) || (datum instanceof scala.collection.Map);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java b/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java
deleted file mode 100644
index 7e9f6bc..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumReader.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.ResolvingDecoder;
-import org.apache.avro.reflect.ReflectDatumReader;
-
-import scala.collection.JavaConversions;
-
-/**
- *
- */
-public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> {
-  
-  public ScalaSafeReflectDatumReader(Schema schema) {
-    super(schema, schema, ScalaSafeReflectData.get());
-  }
-  
-  @Override
-  protected Object readArray(Object old, Schema expected,
-      ResolvingDecoder in) throws IOException {
-    Schema expectedType = expected.getElementType();
-    long l = in.readArrayStart();
-    long base = 0;
-    if (l > 0) {
-      Object array = newArray(old, (int) l, expected);
-      do {
-        for (long i = 0; i < l; i++) {
-          addToArray(array, base + i, read(peekArray(array), expectedType, in));
-        }
-        base += l;
-      } while ((l = in.arrayNext()) > 0);
-      return scalaIterableCheck(array, expected);
-    } else {
-      return scalaIterableCheck(newArray(old, 0, expected), expected);
-    }
-  }
-  
-  @Override
-  protected Object readMap(Object old, Schema expected,
-      ResolvingDecoder in) throws IOException {
-    return scalaMapCheck(super.readMap(old, expected, in), expected);
-  }
-  
-  public static Object scalaMapCheck(Object map, Schema schema) {
-    Class mapClass = ScalaSafeReflectData.getClassProp(schema,
-        ScalaSafeReflectData.CLASS_PROP);
-    if (mapClass != null && mapClass.isAssignableFrom(scala.collection.Map.class)) {
-      return JavaConversions.mapAsScalaMap((Map) map);
-    }
-    return map;
-  }
-  
-  public static Object scalaIterableCheck(Object array, Schema schema) {
-    Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
-        ScalaSafeReflectData.CLASS_PROP);
-    if (collectionClass != null) {
-      if (scala.collection.Iterable.class.isAssignableFrom(collectionClass)) {
-        scala.collection.Iterable it = toIter(array);
-        if (scala.collection.immutable.List.class.isAssignableFrom(collectionClass)) {
-          return it.toList();
-        } else if (scala.collection.mutable.Buffer.class.isAssignableFrom(collectionClass)) {
-          return it.toBuffer();
-        } else if (scala.collection.immutable.Set.class.isAssignableFrom(collectionClass)) {
-          return it.toSet();
-        }
-        return it;
-      }
-    }
-    return array;
-  }
-  
-  private static scala.collection.Iterable toIter(Object array) {
-    return JavaConversions.collectionAsScalaIterable((Collection) array);
-  }
-  
-  @Override
-  @SuppressWarnings(value="unchecked")
-  protected Object newArray(Object old, int size, Schema schema) {
-    ScalaSafeReflectData data = ScalaSafeReflectData.get();
-    Class collectionClass = ScalaSafeReflectData.getClassProp(schema,
-        ScalaSafeReflectData.CLASS_PROP);
-    if (collectionClass != null) {
-      if (old instanceof Collection) {
-        ((Collection)old).clear();
-        return old;
-      }
-      if (scala.collection.Iterable.class.isAssignableFrom(collectionClass) ||
-          collectionClass.isAssignableFrom(ArrayList.class)) {
-        return new ArrayList();
-      }
-      return data.newInstance(collectionClass, schema);
-    }
-    Class elementClass = ScalaSafeReflectData.getClassProp(schema,
-        ScalaSafeReflectData.ELEMENT_PROP);
-    if (elementClass == null)
-      elementClass = data.getClass(schema.getElementType());
-    return Array.newInstance(elementClass, size);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java
----------------------------------------------------------------------
diff --git a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java b/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java
deleted file mode 100644
index 6903f9a..0000000
--- a/scrunch/src/main/java/org/apache/scrunch/ScalaSafeReflectDatumWriter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.avro.reflect.ReflectDatumWriter;
-
-import scala.collection.JavaConversions;
-
-/**
- *
- */
-public class ScalaSafeReflectDatumWriter<T> extends ReflectDatumWriter<T> {
-  public ScalaSafeReflectDatumWriter() {
-    super(ScalaSafeReflectData.get());
-  }
-  
-  @Override
-  protected long getArraySize(Object array) {
-    if (array instanceof scala.collection.Iterable) {
-      return ((scala.collection.Iterable) array).size();
-    }
-    return super.getArraySize(array);
-  }
-
-  @Override
-  protected Iterator<Object> getArrayElements(Object array) {
-    if (array instanceof scala.collection.Iterable) {
-      return JavaConversions.asJavaIterable((scala.collection.Iterable) array).iterator(); 
-    }
-    return super.getArrayElements(array);
-  }
-
-  @Override
-  protected int getMapSize(Object map) {
-    if (map instanceof scala.collection.Map) {
-      return ((scala.collection.Map) map).size();
-    }
-    return super.getMapSize(map);
-  }
-
-  /** Called by the default implementation of {@link #writeMap} to enumerate
-   * map elements.  The default implementation is for {@link Map}.*/
-  @SuppressWarnings("unchecked")
-  protected Iterable<Map.Entry<Object,Object>> getMapEntries(Object map) {
-    if (map instanceof scala.collection.Map) {
-      return JavaConversions.mapAsJavaMap((scala.collection.Map) map).entrySet();
-    }
-    return super.getMapEntries(map);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala b/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
deleted file mode 100644
index a704b80..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, PTable => JTable, DoFn, Emitter}
-import org.apache.crunch.{Pair => CPair}
-import org.apache.crunch.types.PType
-import java.nio.ByteBuffer
-import scala.collection.Iterable
-
-trait CanParallelTransform[El, To] {
-  def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]): To
-}
-
-trait LowPriorityParallelTransforms {
-  implicit def single[B] = new CanParallelTransform[B, PCollection[B]] {
-    def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, B], ptype: PType[B]) = {
-      c.parallelDo(fn, ptype)
-    }
-  }
-}
-
-object CanParallelTransform extends LowPriorityParallelTransforms {
-  def tableType[K, V](ptype: PType[(K, V)]) = {
-    val st = ptype.getSubTypes()
-    ptype.getFamily().tableOf(st.get(0).asInstanceOf[PType[K]], st.get(1).asInstanceOf[PType[V]])
-  }
-
-  implicit def keyvalue[K, V] = new CanParallelTransform[(K, V), PTable[K,V]] {
-    def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, (K, V)], ptype: PType[(K, V)]) = {
-      c.parallelDo(kvWrapFn(fn), tableType(ptype))
-    }
-  }
-
-  def kvWrapFn[A, K, V](fn: DoFn[A, (K, V)]) = {
-    new DoFn[A, CPair[K, V]] {
-      override def process(input: A, emitFn: Emitter[CPair[K, V]]) {
-        fn.process(input, new Emitter[(K, V)] {
-          override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) }
-          override def flush() { emitFn.flush() }
-        })
-      }
-    }
-  }
-} 
-
-trait PTypeH[T] {
-  def get(ptf: PTypeFamily): PType[T]
-}
-
-object PTypeH {
-
-  implicit val longs = new PTypeH[Long] { def get(ptf: PTypeFamily) = ptf.longs }
-  implicit val ints = new PTypeH[Int] { def get(ptf: PTypeFamily) = ptf.ints }
-  implicit val floats = new PTypeH[Float] { def get(ptf: PTypeFamily) = ptf.floats }
-  implicit val doubles = new PTypeH[Double] { def get(ptf: PTypeFamily) = ptf.doubles }
-  implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings }
-  implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans }
-  implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes }
-
-  implicit def collections[T: PTypeH] = {
-    new PTypeH[Iterable[T]] {
-      def get(ptf: PTypeFamily) = {
-        ptf.collections(implicitly[PTypeH[T]].get(ptf))
-      }
-    }
-  }
-
-  implicit def lists[T: PTypeH] = {
-    new PTypeH[List[T]] {
-      def get(ptf: PTypeFamily) = {
-        ptf.lists(implicitly[PTypeH[T]].get(ptf))
-      }
-    }
-  }
-
-  implicit def sets[T: PTypeH] = {
-    new PTypeH[Set[T]] {
-      def get(ptf: PTypeFamily) = {
-        ptf.sets(implicitly[PTypeH[T]].get(ptf))
-      }
-    }
-  }
-
-  implicit def pairs[A: PTypeH, B: PTypeH] = {
-    new PTypeH[(A, B)] {
-      def get(ptf: PTypeFamily) = {
-        ptf.tuple2(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf))
-      }
-    }
-  }
-
-  implicit def trips[A: PTypeH, B: PTypeH, C: PTypeH] = {
-    new PTypeH[(A, B, C)] {
-      def get(ptf: PTypeFamily) = {
-        ptf.tuple3(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf),
-            implicitly[PTypeH[C]].get(ptf))
-      }
-    }
-  }
-
-  implicit def quads[A: PTypeH, B: PTypeH, C: PTypeH, D: PTypeH] = {
-    new PTypeH[(A, B, C, D)] {
-      def get(ptf: PTypeFamily) = {
-        ptf.tuple4(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf),
-            implicitly[PTypeH[C]].get(ptf), implicitly[PTypeH[D]].get(ptf))
-      }
-    }
-  }
-
-  implicit def records[T <: AnyRef : ClassManifest] = new PTypeH[T] {
-    def get(ptf: PTypeFamily) = ptf.records(classManifest[T]).asInstanceOf[PType[T]]
-  }
-}
-
-object Conversions {
-  implicit def jtable2ptable[K, V](jtable: JTable[K, V]) = {
-    new PTable[K, V](jtable)
-  }
-  
-  implicit def jcollect2pcollect[S](jcollect: JCollection[S]) = {
-    new PCollection[S](jcollect)
-  }
-  
-  implicit def jgrouped2pgrouped[K, V](jgrouped: JGroupedTable[K, V]) = {
-    new PGroupedTable[K, V](jgrouped)
-  }
-
-  implicit def pair2tuple[K, V](p: CPair[K, V]) = (p.first(), p.second())
-
-  implicit def tuple2pair[K, V](t: (K, V)) = CPair.of(t._1, t._2)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
deleted file mode 100644
index 8d69701..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.hadoop.conf.Configuration
-
-/**
- * Adds a pipeline to the class it is being mixed in to.
- */
-trait EmbeddedPipeline {
-  /** The pipeline to use. */
-  protected def pipeline: Pipeline
-}
-
-/**
- * Adds a mapreduce pipeline to the class it is being mixed in to.
- */
-trait MREmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
-  protected val pipeline: Pipeline = {
-    Pipeline.mapReduce(ClassManifest.fromClass(getClass()).erasure, new Configuration())
-  }
-}
-
-/**
- * Adds an in memory pipeline to the class it is being mixed in to.
- */
-trait MemEmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
-  protected val pipeline: Pipeline = {
-    Pipeline.inMemory
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
deleted file mode 100644
index 0fbd0ea..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.Source
-import org.apache.crunch.TableSource
-import org.apache.crunch.Target
-
-trait EmbeddedPipelineLike { self: EmbeddedPipeline =>
-  /**
-   * Reads a source into a [[org.apache.scrunch.PCollection]]
-   *
-   * @param source The source to read from.
-   * @tparam T The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def read[T](source: Source[T]): PCollection[T] = {
-    pipeline.read(source)
-  }
-
-  /**
-   * Reads a source into a [[org.apache.scrunch.PTable]]
-   *
-   * @param source The source to read from.
-   * @tparam K The type of the keys being read.
-   * @tparam V The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def read[K, V](source: TableSource[K, V]): PTable[K, V] = {
-    pipeline.read(source)
-  }
-
-  /**
-   * Reads a source into a [[org.apache.scrunch.PCollection]]
-   *
-   * @param source The source to read from.
-   * @tparam T The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def load[T](source: Source[T]): PCollection[T] = {
-    read(source)
-  }
-
-  /**
-   * Reads a source into a [[org.apache.scrunch.PTable]]
-   *
-   * @param source The source to read from.
-   * @tparam K The type of the keys being read.
-   * @tparam V The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def load[K, V](source: TableSource[K, V]): PTable[K, V] = {
-    read(source)
-  }
-
-  /**
-   * Writes a parallel collection to a target.
-   *
-   * @param collection The collection to write.
-   * @param target The destination target for this write.
-   */
-  def write(collection: PCollection[_], target: Target) {
-    pipeline.write(collection, target)
-  }
-
-  /**
-   * Writes a parallel table to a target.
-   *
-   * @param table The table to write.
-   * @param target The destination target for this write.
-   */
-  def write(table: PTable[_, _], target: Target) {
-    pipeline.write(table, target)
-  }
-
-  /**
-   * Writes a parallel collection to a target.
-   *
-   * @param collection The collection to write.
-   * @param target The destination target for this write.
-   */
-  def store(collection: PCollection[_], target: Target) {
-    write(collection, target)
-  }
-
-  /**
-   * Writes a parallel table to a target.
-   *
-   * @param table The table to write.
-   * @param target The destination target for this write.
-   */
-  def store(table: PTable[_, _], target: Target) {
-    write(table, target)
-  }
-
-  /**
-   * Constructs and executes a series of MapReduce jobs in order
-   * to write data to the output targets.
-   */
-  def run() {
-    pipeline.run
-  }
-
-  /**
-   * Run any remaining jobs required to generate outputs and then
-   * clean up any intermediate data files that were created in
-   * this run or previous calls to `run`.
-   */
-  def done() {
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/IO.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/IO.scala b/scrunch/src/main/scala/org/apache/scrunch/IO.scala
deleted file mode 100644
index 9081d3f..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/IO.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.io.{From => from, To => to, At => at}
-import org.apache.crunch.types.avro.AvroType
-import org.apache.hadoop.fs.Path;
-
-trait From {
-  def avroFile[T](path: String, atype: AvroType[T]) = from.avroFile(path, atype)
-  def avroFile[T](path: Path, atype: AvroType[T]) = from.avroFile(path, atype)
-  def textFile(path: String) = from.textFile(path)
-  def textFile(path: Path) = from.textFile(path)
-}
-
-object From extends From
-
-trait To {
-  def avroFile[T](path: String) = to.avroFile(path)
-  def avroFile[T](path: Path) = to.avroFile(path)
-  def textFile(path: String) = to.textFile(path)
-  def textFile(path: Path) = to.textFile(path)
-}
-
-object To extends To
-
-trait At {
-  def avroFile[T](path: String, atype: AvroType[T]) = at.avroFile(path, atype)
-  def avroFile[T](path: Path, atype: AvroType[T]) = at.avroFile(path, atype)
-  def textFile(path: String) = at.textFile(path)
-  def textFile(path: Path) = at.textFile(path)
-}
-
-object At extends At
-

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Mem.scala b/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
deleted file mode 100644
index 1c4f233..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import java.lang.{Iterable => JIterable}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.crunch.{Pair => P}
-import org.apache.crunch.{Source, TableSource, Target}
-import org.apache.crunch.impl.mem.MemPipeline
-import org.apache.scrunch.Conversions._
-
-/**
- * Object for working with in-memory PCollection and PTable instances.
- */
-object Mem extends MemEmbeddedPipeline with PipelineHelper {
-  private val ptf = Avros
-
-  /**
-   * Constructs a PCollection using in memory data.
-   *
-   * @param collect The data to load.
-   * @return A PCollection containing the specified data.
-   */
-  def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = {
-    collectionOf(List(ts:_*))
-  }
-
-  /**
-   * Constructs a PCollection using in memory data.
-   *
-   * @param collect The data to load.
-   * @return A PCollection containing the specified data.
-   */
-  def collectionOf[T](collect: Iterable[T])(implicit pt: PTypeH[T]): PCollection[T] = {
-    val native = MemPipeline.typedCollectionOf(pt.get(ptf), asJavaIterable(collect))
-    new PCollection[T](native)
-  }
-
-  /**
-   * Constructs a PTable using in memory data.
-   *
-   * @param pairs The data to load.
-   * @return A PTable containing the specified data.
-   */
-  def tableOf[K, V](pairs: (K, V)*)(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
-    tableOf(List(pairs:_*))
-  }
-
-  /**
-   * Constructs a PTable using in memory data.
-   *
-   * @param pairs The data to load.
-   * @return A PTable containing the specified data.
-   */
-  def tableOf[K, V](pairs: Iterable[(K, V)])(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
-    val cpairs = pairs.map(kv => P.of(kv._1, kv._2))
-    val ptype = ptf.tableOf(pk.get(ptf), pv.get(ptf))
-    new PTable[K, V](MemPipeline.typedTableOf(ptype, asJavaIterable(cpairs)))
-  }
-
-  /** Contains factory methods used to create `Source`s. */
-  val from = From
-
-  /** Contains factory methods used to create `Target`s. */
-  val to = To
-
-  /** Contains factory methods used to create `SourceTarget`s. */
-  val at = At
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
deleted file mode 100644
index 0924587..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import scala.collection.JavaConversions
-
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
-import org.apache.crunch.lib.{Aggregate, Cartesian}
-import org.apache.scrunch.Conversions._
-import org.apache.scrunch.interpreter.InterpreterRunner
-
-class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
-  import PCollection._
-
-  def filter(f: S => Boolean): PCollection[S] = {
-    parallelDo(filterFn[S](f), native.getPType())
-  }
-
-  def map[T, To](f: S => T)(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, mapFn(f), pt.get(getTypeFamily()))
-  }
-
-  def flatMap[T, To](f: S => Traversable[T])
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, flatMapFn(f), pt.get(getTypeFamily()))
-  }
-
-  def union(others: PCollection[S]*) = {
-    new PCollection[S](native.union(others.map(_.native) : _*))
-  }
-
-  def by[K: PTypeH](f: S => K): PTable[K, S] = {
-    val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
-    parallelDo(mapKeyFn[S, K](f), ptype)
-  }
-
-  def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
-    by(f).groupByKey
-  }
-
-  def cross[S2](other: PCollection[S2]): PCollection[(S, S2)] = {
-    val inter = Cartesian.cross(this.native, other.native)
-    val f = (in: CPair[S, S2]) => (in.first(), in.second())
-    inter.parallelDo(mapFn(f), getTypeFamily().tuple2(pType, other.pType))
-  }
-
-  def materialize() = {
-    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
-    JavaConversions.iterableAsScalaIterable[S](native.materialize)
-  }
-
-  def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
-
-  def count() = {
-    val count = new PTable[S, java.lang.Long](Aggregate.count(native))
-    count.mapValues(_.longValue())
-  }
-
-  def max() = wrap(Aggregate.max(native))
-
-  def pType = native.getPType()
-}
-
-trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] {
-  override def process(input: S, emitter: Emitter[T]) {
-    for (v <- apply(input)) {
-      emitter.emit(v)
-    }
-  }
-}
-
-trait SFilterFn[T] extends FilterFn[T] with Function1[T, Boolean] {
-  override def accept(input: T) = apply(input)
-}
-
-trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] {
-  override def map(input: S) = apply(input)
-}
-
-trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] {
-  override def map(input: S): CPair[K, S] = {
-    CPair.of(apply(input), input)
-  }
-}
-
-object PCollection {
-  def filterFn[S](fn: S => Boolean) = {
-    new SFilterFn[S] { def apply(x: S) = fn(x) }
-  }
-
-  def mapKeyFn[S, K](fn: S => K) = {
-    new SMapKeyFn[S, K] { def apply(x: S) = fn(x) }
-  }
-
-  def mapFn[S, T](fn: S => T) = {
-    new SMapFn[S, T] { def apply(s: S) = fn(s) }
-  }
-
-  def flatMapFn[S, T](fn: S => Traversable[T]) = {
-    new SDoFn[S, T] { def apply(s: S) = fn(s) }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
deleted file mode 100644
index e912e60..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.DoFn
-import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target}
-import org.apache.crunch.types.{PType, PTableType}
-
-trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
-  val native: NativeType
-  
-  def wrap(newNative: AnyRef): FullType
-  
-  def write(target: Target): FullType = wrap(native.write(target))
-
-  def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
-    new PCollection[T](native.parallelDo(fn, ptype))
-  }
-
-  def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T]) = {
-    new PCollection[T](native.parallelDo(name, fn, ptype))
-  }
-
-  def parallelDo[K, V](fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
-    new PTable[K, V](native.parallelDo(fn, ptype))
-  }
-
-  def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
-    new PTable[K, V](native.parallelDo(name, fn, ptype))
-  }
-  
-  def getTypeFamily() = Avros
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
deleted file mode 100644
index f4500a5..0000000
--- a/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.scrunch
-
-import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{CombineFn, PGroupedTable => JGroupedTable, PTable => JTable, Pair => CPair}
-import java.lang.{Iterable => JIterable}
-import scala.collection.{Iterable, Iterator}
-import scala.collection.JavaConversions._
-import Conversions._
-
-class PGroupedTable[K, V](val native: JGroupedTable[K, V])
-    extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] {
-  import PGroupedTable._
-
-  def filter(f: (K, Iterable[V]) => Boolean) = {
-    parallelDo(filterFn[K, V](f), native.getPType())
-  }
-
-  def map[T, To](f: (K, Iterable[V]) => T)
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, mapFn(f), pt.get(getTypeFamily()))
-  }
-
-  def flatMap[T, To](f: (K, Iterable[V]) => Traversable[T])
-      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
-    b(this, flatMapFn(f), pt.get(getTypeFamily()))
-  }
-
-  def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f))
-
-  def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn))
-
-  def ungroup() = new PTable[K, V](native.ungroup())
-  
-  def wrap(newNative: AnyRef): PGroupedTable[K, V] = {
-    new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]])
-  }
-}
-
-class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] {
-  override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = {
-    emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()))))
-  }
-}
-
-trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] {
-  override def accept(input: CPair[K, JIterable[V]]) = apply(input.first(), iterableAsScalaIterable[V](input.second()))
-}
-
-trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], Traversable[T]] {
-  override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
-    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
-      emitter.emit(v)
-    }
-  }
-}
-
-trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], T] {
-  override def map(input: CPair[K, JIterable[V]]) = {
-    apply(input.first(), iterableAsScalaIterable[V](input.second()))
-  }
-}
-
-object PGroupedTable {
-  def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
-    new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
-  }
-
-  def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = {
-    new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
-  }
-
-  def flatMapFn[K, V, T](fn: (K, Iterable[V]) => Traversable[T]) = {
-    new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
-  }
-}


Mime
View raw message