incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [5/19] CRUNCH-17: Move Scrunch tests to the integration tests directory.
Date Sat, 14 Jul 2012 17:28:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/resources/urls.txt
----------------------------------------------------------------------
diff --git a/scrunch/src/it/resources/urls.txt b/scrunch/src/it/resources/urls.txt
new file mode 100644
index 0000000..827e711
--- /dev/null
+++ b/scrunch/src/it/resources/urls.txt
@@ -0,0 +1,11 @@
+www.A.com	www.B.com
+www.A.com	www.C.com
+www.A.com	www.D.com
+www.A.com	www.E.com
+www.B.com	www.D.com
+www.B.com	www.E.com
+www.C.com	www.D.com
+www.D.com	www.B.com
+www.E.com	www.A.com
+www.F.com	www.B.com
+www.F.com	www.C.com

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala b/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
new file mode 100644
index 0000000..b0e94a1
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/CogroupTest.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class CogroupTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[CogroupTest]
+
+  def wordCount(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+        .flatMap(_.toLowerCase.split("\\W+")).count
+  }
+
+  @Test def cogroup {
+    val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
+    val maugham = FileHelper.createTempCopyOf("maugham.txt")
+    val diffs = wordCount(shakespeare).cogroup(wordCount(maugham))
+        .map((k, v) => (k, (v._1.sum - v._2.sum))).materialize
+    assert(diffs.exists(_ == ("the", -11390)))
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala b/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
new file mode 100644
index 0000000..5303c03
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/JoinTest.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class JoinTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[CogroupTest]
+
+  def wordCount(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+        .flatMap(_.toLowerCase.split("\\W+")).count
+  }
+
+  @Test def join {
+    val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
+    val maugham = FileHelper.createTempCopyOf("maugham.txt")
+    val output = FileHelper.createOutputPath()
+    output.deleteOnExit()
+    val filtered = wordCount(shakespeare).join(wordCount(maugham))
+        .map((k, v) => (k, v._1 - v._2))
+        .write(to.textFile(output.getAbsolutePath()))
+        .filter((k, d) => d > 0).materialize
+    assert(filtered.exists(_ == ("macbeth", 66)))
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
new file mode 100644
index 0000000..9ab7897
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/PageRankClassTest.scala
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import Avros._
+
+import org.apache.crunch.{DoFn, Emitter, Pair => P}
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import scala.collection.mutable.HashMap
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+case class PageRankData(pr: Float, oldpr: Float, urls: Array[String]) {
+  def this() = this(0f, 0f, null)
+
+  def scaledPageRank = pr / urls.length
+
+  def next(newPageRank: Float) = new PageRankData(newPageRank, pr, urls)
+
+  def delta = math.abs(pr - oldpr)
+}
+
+class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] {
+  val cache = new HashMap[String, Float] {
+    override def default(key: String) = 0f
+  }
+
+  override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]])
{
+    val prd = input.second()
+    if (prd.urls.length > 0) {
+      val newpr = prd.pr / prd.urls.length
+      prd.urls.foreach(url => cache.put(url, cache(url) + newpr))
+      if (cache.size > 5000) {
+        cleanup(emitFn)
+      }
+    }
+  }
+
+  override def cleanup(emitFn: Emitter[P[String, Float]]) {
+    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
+    cache.clear
+  }
+}
+
+class PageRankClassTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[PageRankTest]
+
+  def initialInput(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
+      .groupByKey
+      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
+  }
+
+  def update(prev: PTable[String, PageRankData], d: Float) = {
+    val outbound = prev.flatMap((url, prd) => {
+      prd.urls.map(link => (link, prd.scaledPageRank))
+    })
+    cg(prev, outbound, d)
+  }
+
+  def cg(prev: PTable[String, PageRankData],
+         out: PTable[String, Float], d: Float) = {
+    prev.cogroup(out).map((url, v) => {
+      val (p, o) = v
+      val prd = p.head
+      (url, prd.next((1 - d) + d * o.sum))
+    })
+  }
+
+  def fastUpdate(prev: PTable[String, PageRankData], d: Float) = {
+    val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats))
+    cg(prev, outbound, d)
+  }
+
+  @Test def testPageRank {
+    pipeline.getConfiguration.set("crunch.debug", "true")
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = update(prev, 0.5f)
+      delta = prev.values.map(_.delta).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+
+  def testFastPageRank {
+    pipeline.getConfiguration.set("crunch.debug", "true")
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = fastUpdate(prev, 0.5f)
+      delta = prev.values.map(_.delta).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
new file mode 100644
index 0000000..cbf7ebf
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/PageRankTest.scala
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import Avros._
+
+import org.apache.crunch.{DoFn, Emitter, Pair => P}
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import scala.collection.mutable.HashMap
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+class CachingPageRankFn extends DoFn[P[String, (Float, Float, List[String])], P[String, Float]]
{
+  val cache = new HashMap[String, Float] {
+    override def default(key: String) = 0f
+  }
+
+  override def process(input: P[String, (Float, Float, List[String])], emitFn: Emitter[P[String,
Float]]) {
+    val (pr, oldpr, urls) = input.second()
+    val newpr = pr / urls.size
+    urls.foreach(url => cache.put(url, cache(url) + newpr))
+    if (cache.size > 5000) {
+      cleanup(emitFn)
+    }
+  }
+
+  override def cleanup(emitFn: Emitter[P[String, Float]]) {
+    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
+    cache.clear
+  }
+}
+
+class PageRankTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[PageRankTest]
+
+  def initialInput(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
+      .groupByKey
+      .map((url, links) => (url, (1f, 0f, links.toList)))
+  }
+
+  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
+    val outbound = prev.flatMap((url, v) => {
+      val (pr, oldpr, links) = v
+      links.map(link => (link, pr / links.size))
+    })
+    cg(prev, outbound, d)
+  }
+
+  def cg(prev: PTable[String, (Float, Float, List[String])],
+         out: PTable[String, Float], d: Float) = {
+    prev.cogroup(out).map((url, v) => {
+      val (p, o) = v
+      val (pr, oldpr, links) = p.head
+      (url, ((1 - d) + d * o.sum, pr, links))
+    })
+  }
+
+  def fastUpdate(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
+    val outbound = prev.parallelDo(new CachingPageRankFn(), tableOf(strings, floats))
+    cg(prev, outbound, d)
+  }
+
+  @Test def testPageRank {
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = update(prev, 0.5f)
+      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+
+  @Test def testFastPageRank {
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = fastUpdate(prev, 0.5f)
+      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala b/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
new file mode 100644
index 0000000..7111c7b
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/PipelineAppTest.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.crunch.test.FileHelper
+import org.apache.scrunch.PipelineApp
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+object WordCount extends PipelineApp {
+
+  def wordSplit(line: String) = line.split("\\W+").filter(!_.isEmpty())
+
+  def countWords(filename: String) = {
+    val lines = read(from.textFile(filename))
+    val words = lines.flatMap(wordSplit)
+    words.count
+  }
+
+  val w1 = countWords(args(0))
+  val w2 = countWords(args(1))
+  cogroup(w1, w2).write(to.textFile(args(2)))
+}
+
+class PipelineAppTest extends JUnitSuite {
+  @Test def run {
+    val args = new Array[String](3)
+    args(0) = FileHelper.createTempCopyOf("shakes.txt")
+    args(1) = FileHelper.createTempCopyOf("maugham.txt")
+    args(2) = FileHelper.createOutputPath.getAbsolutePath
+    WordCount.main(args)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala b/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
new file mode 100644
index 0000000..bac27bd
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/TopTest.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class TopTest extends JUnitSuite {
+
+  @Test def topInMem {
+    val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729))
+    assert(ptable.top(1, true).materialize.head == ("baz", 1729))
+  }
+
+  @Test def top2 {
+    val pipeline = Pipeline.mapReduce[TopTest]
+    val input = FileHelper.createTempCopyOf("shakes.txt")
+
+    val wc = pipeline.read(from.textFile(input))
+        .flatMap(_.toLowerCase.split("\\s+"))
+        .filter(!_.isEmpty()).count
+    assert(wc.top(10, true).materialize.exists(_ == ("is", 205)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala b/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
new file mode 100644
index 0000000..63fecdb
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/UnionTest.scala
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class UnionTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[UnionTest]
+  val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
+  val maugham = FileHelper.createTempCopyOf("maugham.txt")
+
+  def wordCount(col: PCollection[String]) = {
+    col.flatMap(_.toLowerCase.split("\\W+")).count
+  }
+
+  @Test def testUnionCollection {
+    val union = pipeline.read(from.textFile(shakespeare)).union(
+        pipeline.read(from.textFile(maugham)))
+    val wc = wordCount(union).materialize
+    assert(wc.exists(_ == ("you", 3691)))
+    pipeline.done
+  }
+
+  @Test def testUnionTable {
+    val wcs = wordCount(pipeline.read(from.textFile(shakespeare)))
+    val wcm = wordCount(pipeline.read(from.textFile(maugham)))
+    val wc = wcs.union(wcm).groupByKey.combine(v => v.sum).materialize
+    assert(wc.exists(_ == ("you", 3691)))
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala b/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
new file mode 100644
index 0000000..e97a1fd
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/WordCountTest.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.FileHelper
+
+import java.io.File
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class WordCountTest extends JUnitSuite {
+  @Test def wordCount {
+    val pipeline = Pipeline.mapReduce[WordCountTest]
+    val input = FileHelper.createTempCopyOf("shakes.txt")
+    val wordCountOut = FileHelper.createOutputPath
+
+    val fcc = pipeline.read(from.textFile(input))
+        .flatMap(_.toLowerCase.split("\\s+"))
+        .filter(!_.isEmpty()).count
+        .write(to.textFile(wordCountOut.getAbsolutePath)) // Word counts
+        .map((w, c) => (w.slice(0, 1), c))
+        .groupByKey.combine(v => v.sum).materialize
+    assert(fcc.exists(_ == ("w", 1404)))
+
+    pipeline.done
+    wordCountOut.delete()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
b/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
new file mode 100644
index 0000000..5d38027
--- /dev/null
+++ b/scrunch/src/it/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch.interpreter
+
+import java.io.File
+import java.io.FileOutputStream
+import java.util.jar.JarFile
+import java.util.jar.JarOutputStream
+
+import scala.tools.nsc.io.VirtualDirectory
+
+import com.google.common.io.Files
+import org.junit.Assert.assertNotNull
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
+ */
+class InterpreterJarTest extends JUnitSuite {
+
+  /**
+   * Tests transforming a virtual directory into a temporary jar file.
+   */
+  @Test def virtualDirToJar: Unit = {
+    // Create a virtual directory and populate with some mock content.
+    val root = new VirtualDirectory("testDir", None)
+    // Add some subdirectories to the root.
+    (1 to 10).foreach { i =>
+      val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
+      // Add some classfiles to each sub directory.
+      (1 to 10).foreach { j =>
+        subdir.fileNamed("MyClass" + j + ".class")
+      }
+    }
+
+    // Now generate a jar file from the virtual directory.
+    val tempDir = Files.createTempDir()
+    tempDir.deleteOnExit()
+    val tempJar = new File(tempDir, "replJar.jar")
+    val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
+    InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
+    jarStream.close()
+
+    // Verify the contents of the jar.
+    val jarFile = new JarFile(tempJar)
+    (1 to 10).foreach { i =>
+      (1 to 10).foreach { j =>
+        val entryName = "top/pack/name/subdir" + i + "/MyClass" + j + ".class"
+        val entry = jarFile.getEntry(entryName)
+        assertNotNull("Jar entry " + entryName + " not found in generated jar.", entry)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/59194a6f/scrunch/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/scrunch/src/test/resources/log4j.properties b/scrunch/src/test/resources/log4j.properties
deleted file mode 100644
index 985a2eb..0000000
--- a/scrunch/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,13 +0,0 @@
-# ***** Set root logger level to INFO and its only appender to A.
-log4j.logger.org.apache.crunch=info, A
-log4j.logger.org.apache.scrunch=info, A
-
-# Log warnings on Hadoop for the local runner when testing
-log4j.logger.org.apache.hadoop=warn, A
-
-# ***** A is set to be a ConsoleAppender.
-log4j.appender.A=org.apache.log4j.ConsoleAppender
-# ***** A uses PatternLayout.
-log4j.appender.A.layout=org.apache.log4j.PatternLayout
-log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-


Mime
View raw message