incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-73. PipelineApp does not extend DelayedInit so that Scala closures are properly serialized.
Date Sat, 22 Sep 2012 00:31:40 GMT
CRUNCH-73. PipelineApp does not extend DelayedInit so that Scala closures are properly serialized.

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/4e0ead87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/4e0ead87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/4e0ead87

Branch: refs/heads/master
Commit: 4e0ead87d77be4991bc776b3938f27ed26bbf5e3
Parents: a65feb5
Author: Kiyan Ahmadizadeh <kiyan@wibidata.com>
Authored: Thu Sep 20 17:00:37 2012 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Fri Sep 21 15:45:12 2012 -0700

----------------------------------------------------------------------
 crunch-scrunch/src/it/resources/tens.txt           |  100 +++++++++++++++
 .../crunch/scrunch/PipelineAppClosureTest.scala    |   58 +++++++++
 .../apache/crunch/scrunch/PipelineAppTest.scala    |    8 +-
 .../src/main/examples/ClassyPageRank.scala         |   30 +++--
 crunch-scrunch/src/main/examples/PageRank.scala    |   26 ++--
 crunch-scrunch/src/main/examples/WordCount.scala   |    6 +-
 .../org/apache/crunch/scrunch/PipelineApp.scala    |   24 ++--
 7 files changed, 207 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/it/resources/tens.txt
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/resources/tens.txt b/crunch-scrunch/src/it/resources/tens.txt
new file mode 100644
index 0000000..d3bfd02
--- /dev/null
+++ b/crunch-scrunch/src/it/resources/tens.txt
@@ -0,0 +1,100 @@
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10
+10

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala
b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala
new file mode 100644
index 0000000..e0702e4
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.test.CrunchTestSupport
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+/**
+ * Test that verifies that a Scala PipelineApp can properly send some side data as part of
a
+ * function closure.
+ */
+class PipelineClosureAppTest extends CrunchTestSupport with JUnitSuite {
+
+  /**
+   * A simple pipeline application that divides each element of a PCollection of numbers
by
+   * 10. The PCollection of numbers used as input is just the number 10 repeated 100 times.
+   * Thus the resulting PCollection should be the number 1 repeated 100 times.
+   */
+  object Divider extends PipelineApp {
+
+    /**
+     * Runs the Pipeline for this test and verifies it has the desired effect of transforming
a
+     * PCollection of 10s into a PCollection of 1s.
+     */
+    override def run(args: Array[String]) {
+      val divisor = 10
+      val tens = read(from.textFile(args(0)))
+      val ones = tens.map { n => Integer.valueOf(n) / divisor }
+      val countOfOnes = ones.count().materializeToMap()
+      assertEquals(100, countOfOnes(1))
+    }
+  }
+
+  @Test def run {
+    val args = new Array[String](1)
+    args(0) = tempDir.copyResourceFileName("tens.txt")
+    tempDir.overridePathProperties(Divider.configuration)
+    Divider.main(args)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
index 4947b40..db49c92 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
@@ -32,9 +32,11 @@ object WordCount extends PipelineApp {
     words.count
   }
 
-  val w1 = countWords(args(0))
-  val w2 = countWords(args(1))
-  cogroup(w1, w2).write(to.textFile(args(2)))
+  override def run(args: Array[String]) {
+    val w1 = countWords(args(0))
+    val w2 = countWords(args(1))
+    cogroup(w1, w2).write(to.textFile(args(2)))
+  }
 }
 
 class PipelineAppTest extends CrunchTestSupport with JUnitSuite {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/examples/ClassyPageRank.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/examples/ClassyPageRank.scala b/crunch-scrunch/src/main/examples/ClassyPageRank.scala
index 540aaca..1bc705d 100644
--- a/crunch-scrunch/src/main/examples/ClassyPageRank.scala
+++ b/crunch-scrunch/src/main/examples/ClassyPageRank.scala
@@ -22,9 +22,9 @@ case class UrlData(pageRank: Float, oldPageRank: Float, links: List[String])
{
   def this() = this(1.0f, 0.0f, Nil)
 
   def this(links: String*) = this(1.0f, 0.0f, List(links:_*))
- 
+
   def this(links: Iterable[String]) = this(1.0f, 0.0f, links.toList)
-  
+
   def delta = math.abs(pageRank - oldPageRank)
 
   def next(newPageRank: Float) = new UrlData(newPageRank, pageRank, links)
@@ -55,17 +55,19 @@ object ClassyPageRank extends PipelineApp {
     })
   }
 
-  var index = 0
-  var delta = 10.0f
-  fs.mkdirs("prank/")
-  var curr = initialize(args(0))
-  while (delta > 1.0f) {
-    index = index + 1
-    curr = update(curr, 0.5f)
-    write(curr, to.avroFile("prank/" + index))
-    delta = curr.values.map(_.delta).max.materialize.head
-    println("Current delta = " + delta)
+  override def run(args: Array[String]) {
+    var index = 0
+    var delta = 10.0f
+    fs.mkdirs("prank/")
+    var curr = initialize(args(0))
+    while (delta > 1.0f) {
+      index = index + 1
+      curr = update(curr, 0.5f)
+      write(curr, to.avroFile("prank/" + index))
+      delta = curr.values.map(_.delta).max.value()
+      println("Current delta = " + delta)
+    }
+    fs.rename("prank/" + index, args(1))
+    fs.delete("prank/", true)
   }
-  fs.rename("prank/" + index, args(1))
-  fs.delete("prank/", true)
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/examples/PageRank.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/examples/PageRank.scala b/crunch-scrunch/src/main/examples/PageRank.scala
index e0ed3bc..5e08169 100644
--- a/crunch-scrunch/src/main/examples/PageRank.scala
+++ b/crunch-scrunch/src/main/examples/PageRank.scala
@@ -45,17 +45,19 @@ object PageRank extends PipelineApp {
     })
   }
 
-  var index = 0
-  var delta = 10.0f
-  fs.mkdirs("prank/")
-  var curr = initialize(args(0))
-  while (delta > 1.0f) {
-    index = index + 1
-    curr = update(curr, 0.5f)
-    write(curr, to.avroFile("prank/" + index))
-    delta = curr.values.map(v => math.abs(v._1 - v._2)).max.materialize.head
-    println("Current delta = " + delta)
+  override def run(args: Array[String]) {
+    var index = 0
+    var delta = 10.0f
+    fs.mkdirs("prank/")
+    var curr = initialize(args(0))
+    while (delta > 1.0f) {
+      index = index + 1
+      curr = update(curr, 0.5f)
+      write(curr, to.avroFile("prank/" + index))
+      delta = curr.values.map(v => math.abs(v._1 - v._2)).max.value()
+      println("Current delta = " + delta)
+    }
+    fs.rename("prank/" + index, args(1))
+    fs.delete("prank/", true)
   }
-  fs.rename("prank/" + index, args(1))
-  fs.delete("prank/", true)
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/examples/WordCount.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/examples/WordCount.scala b/crunch-scrunch/src/main/examples/WordCount.scala
index 59cd1c5..18ee9d6 100644
--- a/crunch-scrunch/src/main/examples/WordCount.scala
+++ b/crunch-scrunch/src/main/examples/WordCount.scala
@@ -25,6 +25,8 @@ object WordCount extends PipelineApp {
       .count
   }
 
-  val counts = join(countWords(args(0)), countWords(args(1)))
-  write(counts, to.textFile(args(2)))
+  override def run(args: Array[String]) {
+    val counts = join(countWords(args(0)), countWords(args(1)))
+    write(counts, to.textFile(args(2)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
index 3a1a2bd..11395d3 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.util.GenericOptionsParser
 
 import org.apache.crunch.{Source, TableSource, Target}
 
-trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit {
+trait PipelineApp extends MREmbeddedPipeline with PipelineHelper {
   implicit def _string2path(str: String) = new Path(str)
 
   /** Contains factory methods used to create `Source`s. */
@@ -39,26 +39,22 @@ trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with
DelayedIni
   /** Contains factory methods used to create `SourceTarget`s. */
   val at = At
 
-  private val initCode = new ListBuffer[() => Unit]
-
-  private var _args: Array[String] = _
-
-  /** Command-line arguments passed to this application. */
-  protected def args: Array[String] = _args
-
   def configuration: Configuration = pipeline.getConfiguration
 
   /** Gets the distributed filesystem associated with this application's configuration. */
   def fs: FileSystem = FileSystem.get(configuration)
 
-  override def delayedInit(body: => Unit) {
-    initCode += (() => body)
-  }
+  /**
+   * The entry-point for pipeline applications.  Clients should override this method to implement
+   * the logic of their pipeline application.
+   *
+   * @param args The command-line arguments passed to the pipeline application.
+   */
+  def run(args: Array[String]): Unit
 
-  def main(args: Array[String]) = {
+  final def main(args: Array[String]) = {
     val parser = new GenericOptionsParser(configuration, args)
-    _args = parser.getRemainingArgs()
-    for (proc <- initCode) proc()
+    run(parser.getRemainingArgs())
     done
   }
 }


Mime
View raw message