incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [4/4] git commit: Slightly revised patch for CRUNCH-9, submitted by Kiyan Ahmadizadeh
Date Tue, 10 Jul 2012 22:21:22 GMT
Slightly revised patch for CRUNCH-9, submitted by Kiyan Ahmadizadeh


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1ed57904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1ed57904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1ed57904

Branch: refs/heads/master
Commit: 1ed57904e095620aaee95fc9cb96773446f473e8
Parents: a3f8e4a
Author: jwills <jwills@apache.org>
Authored: Mon Jul 9 21:13:02 2012 -0700
Committer: jwills <jwills@apache.org>
Committed: Mon Jul 9 21:13:02 2012 -0700

----------------------------------------------------------------------
 .gitignore                                         |    3 +
 scrunch/examples/ClassyPageRank.scala              |   71 -----
 scrunch/examples/PageRank.scala                    |   61 -----
 scrunch/examples/WordCount.scala                   |   30 ---
 scrunch/pom.xml                                    |   47 +++-
 scrunch/scripts/scrunch.py                         |  118 ---------
 scrunch/src/main/assembly/release.xml              |   75 ++++++
 scrunch/src/main/conf/log4j.properties             |    8 +
 scrunch/src/main/examples/ClassyPageRank.scala     |   71 +++++
 scrunch/src/main/examples/PageRank.scala           |   61 +++++
 scrunch/src/main/examples/WordCount.scala          |   27 ++
 scrunch/src/main/scala/org/apache/scrunch/IO.scala |   13 +-
 .../scala/org/apache/scrunch/PCollection.scala     |   17 +-
 .../src/main/scala/org/apache/scrunch/PTable.scala |   12 +-
 .../main/scala/org/apache/scrunch/Pipeline.scala   |   28 ++-
 .../scala/org/apache/scrunch/PipelineLike.scala    |    6 +-
 .../scrunch/interpreter/InterpreterRunner.scala    |  203 +++++++++++++++
 scrunch/src/main/scripts/imports.scala             |   19 ++
 scrunch/src/main/scripts/scrunch                   |  163 ++++++++++++
 scrunch/src/main/scripts/scrunch-job.py            |  118 +++++++++
 scrunch/src/test/resources/log4j.properties        |   13 +
 .../scrunch/interpreter/InterpreterJarTest.scala   |   70 +++++
 .../apache/crunch/impl/mr/plan/JobPrototype.java   |   10 +-
 .../java/org/apache/crunch/util/DistCache.java     |   37 +++
 src/test/resources/log4j.properties                |    1 +
 25 files changed, 975 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index d9d491e..d5149ce 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,6 @@
 .settings
 .cache
 target
+*.iml
+.idea
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/examples/ClassyPageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/examples/ClassyPageRank.scala b/scrunch/examples/ClassyPageRank.scala
deleted file mode 100644
index 6c819a5..0000000
--- a/scrunch/examples/ClassyPageRank.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-import org.apache.scrunch.Mem._
-
-case class UrlData(pageRank: Float, oldPageRank: Float, links: List[String]) {
-  def this() = this(1.0f, 0.0f, Nil)
-
-  def this(links: String*) = this(1.0f, 0.0f, List(links:_*))
- 
-  def this(links: Iterable[String]) = this(1.0f, 0.0f, links.toList)
-  
-  def delta = math.abs(pageRank - oldPageRank)
-
-  def next(newPageRank: Float) = new UrlData(newPageRank, pageRank, links)
-
-  def outboundScores = links.map(link => (link, pageRank / links.size))
-}
-
-object ClassyPageRank extends PipelineApp {
-
-  def initialize(file: String) = {
-    read(from.textFile(file))
-      .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
-      .groupByKey
-      .map((url, links) => (url, new UrlData(links)))
-  }
-
-  def update(prev: PTable[String, UrlData], d: Float) = {
-    val outbound = prev.values.flatMap(_.outboundScores)
-
-    cogroup(prev, outbound).mapValues(data => {
-      val (prd, outboundScores) = data
-      val newPageRank = (1 - d) + d * outboundScores.sum
-      if (!prd.isEmpty) {
-        prd.head.next(newPageRank)
-      } else {
-        new UrlData(newPageRank, 0, Nil)
-      }
-    })
-  }
-
-  var index = 0
-  var delta = 10.0f
-  fs.mkdirs("prank/")
-  var curr = initialize(args(0))
-  while (delta > 1.0f) {
-    index = index + 1
-    curr = update(curr, 0.5f)
-    write(curr, to.avroFile("prank/" + index))
-    delta = curr.values.map(_.delta).max.materialize.head
-    println("Current delta = " + delta)
-  }
-  fs.rename("prank/" + index, args(1))
-  fs.delete("prank/", true)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/examples/PageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/examples/PageRank.scala b/scrunch/examples/PageRank.scala
deleted file mode 100644
index 7de26e6..0000000
--- a/scrunch/examples/PageRank.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch._
-
-object PageRank extends PipelineApp {
-  def initialize(file: String) = {
-    read(from.textFile(file))
-      .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
-      .groupByKey
-      .map((url, links) => (url, (1f, 0f, links.toList)))
-  }
-
-  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
-    val outbound = prev.flatMap((url, data) => {
-      val (pagerank, old_pagerank, links) = data
-      links.map(link => (link, pagerank / links.size))
-    })
-
-    cogroup(prev, outbound).mapValues(data => {
-      val (prev_data, outbound_data) = data
-      val new_pagerank = (1 - d) + d * outbound_data.sum
-      var cur_pagerank = 0f
-      var links: List[String] = Nil
-      if (!prev_data.isEmpty) {
-        val (cur_pr, old_pr, l) = prev_data.head
-        cur_pagerank = cur_pr
-        links = l
-      }
-      (new_pagerank, cur_pagerank, links)
-    })
-  }
-
-  var index = 0
-  var delta = 10.0f
-  fs.mkdirs("prank/")
-  var curr = initialize(args(0))
-  while (delta > 1.0f) {
-    index = index + 1
-    curr = update(curr, 0.5f)
-    write(curr, to.avroFile("prank/" + index))
-    delta = curr.values.map(v => math.abs(v._1 - v._2)).max.materialize.head
-    println("Current delta = " + delta)
-  }
-  fs.rename("prank/" + index, args(1))
-  fs.delete("prank/", true)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/examples/WordCount.scala
----------------------------------------------------------------------
diff --git a/scrunch/examples/WordCount.scala b/scrunch/examples/WordCount.scala
deleted file mode 100644
index 4c6055f..0000000
--- a/scrunch/examples/WordCount.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.scrunch.PipelineApp
-
-object WordCount extends PipelineApp {
-
-  def countWords(file: String) = {
-    read(from.textFile(file))
-      .flatMap(_.split("\\W+").filter(!_.isEmpty()))
-      .count
-  }
-
-  val counts = join(countWords(args(0)), countWords(args(1)))
-  write(counts, to.textFile(args(2)))
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/scrunch/pom.xml b/scrunch/pom.xml
index 430737e..2c70f73 100644
--- a/scrunch/pom.xml
+++ b/scrunch/pom.xml
@@ -89,6 +89,16 @@ under the License.
       <version>${scala.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>jline</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch</artifactId>
       <version>0.3.0</version>
@@ -132,6 +142,11 @@ under the License.
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.6.1</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.10</version>
@@ -157,11 +172,33 @@ under the License.
     <plugins>
       <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
-        <configuration>
-          <descriptorRefs>
-            <descriptorRef>jar-with-dependencies</descriptorRef>
-          </descriptorRefs>
-        </configuration>
+        <version>2.2.1</version>
+        <executions>
+          <execution>
+            <id>jar-with-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptorRefs>
+                <descriptorRef>jar-with-dependencies</descriptorRef>
+              </descriptorRefs>
+            </configuration>
+          </execution>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptors>
+                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
       <plugin>
         <groupId>org.scala-tools</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/scripts/scrunch.py
----------------------------------------------------------------------
diff --git a/scrunch/scripts/scrunch.py b/scrunch/scripts/scrunch.py
deleted file mode 100755
index 92cf612..0000000
--- a/scrunch/scripts/scrunch.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#!/usr/bin/python
-
-import glob
-import os
-import re
-import shutil
-import subprocess
-import sys
-
-# Configuration in script
-##############################################################
-if not "SCALA_HOME" in os.environ:
-  sys.stderr.write("Environment variable SCALA_HOME must be set\n")
-  sys.exit(1)
-SCALA_LIB = os.path.join(os.environ["SCALA_HOME"], "lib")
-
-if not "HADOOP_HOME" in os.environ:
-  sys.stderr.write("Environment variable HADOOP_HOME must be set\n")
-  sys.exit(1)
-HADOOP_HOME = os.environ["HADOOP_HOME"]
-HADOOP_JARS = ":".join(glob.glob(os.path.join(HADOOP_HOME, "*.jar")))
-
-#Get the absolute path of the original (non-symlink) file.
-if os.path.islink(__file__):
-  ORIGINAL_FILE = os.readlink(__file__)
-else:
-  ORIGINAL_FILE = __file__
-
-SCIENCE_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../")
-JARFILE = SCIENCE_ROOT + "/target/scrunch-0.2.0-jar-with-dependencies.jar" #what jar has all the depencies for this job
-TMPDIR = "/tmp"
-BUILDDIR = TMPDIR + "/script-build"
-COMPILE_CMD = "java -cp %s/scala-library.jar:%s/scala-compiler.jar -Dscala.home=%s scala.tools.nsc.Main" % (SCALA_LIB, SCALA_LIB, SCALA_LIB)
-##############################################################
-
-argv = sys.argv[1:]
-if len(argv) < 1:
-  sys.stderr.write("ERROR: insufficient args.\n")
-  sys.exit(1)
-
-JOBFILE = argv.pop(0)
-
-def file_type():
-  m = re.search(r'\.(scala|java)$', JOBFILE)
-  if m:
-    return m.group(1)
-  return None
-
-def is_file():
-  return file_type() is not None
-
-PACK_RE = r'package ([^;]+)'
-OBJECT_RE = r'object\s+([^\s(]+).*(extends|with)\s+PipelineApp.*'
-EXTENSION_RE = r'(.*)\.(scala|java)$'
-
-#Get the name of the job from the file.
-#the rule is: last class in the file, or the one that matches the filename
-def get_job_name(file):
-  package = ""
-  job = None
-  default = None
-  match = re.search(EXTENSION_RE, file)
-  if match:
-    default = match.group(1)
-    for s in open(file, "r"):
-      mp = re.search(PACK_RE, s)
-      mo = re.search(OBJECT_RE, s)
-      if mp:
-        package = mp.group(1).trim() + "."
-      elif mo:
-        if not job or not default or not job.tolower() == default.tolower():
-          #use either the last class, or the one with the same name as the file
-          job = mo.group(1)
-    if not job:
-      raise "Could not find job name"
-    return "%s%s" % (package, job)
-  else:
-    return file
-
-JARPATH = os.path.abspath(JARFILE)
-if not os.path.exists(JARPATH):
-  sys.stderr.write("Scrunch assembly jar not found; run mvn assembly:assembly to construct it.\n")
-  sys.exit(1)
-  
-JARBASE = os.path.basename(JARFILE)
-JOBPATH = os.path.abspath(JOBFILE)
-JOB = get_job_name(JOBFILE)
-JOBJAR = JOB + ".jar"
-JOBJARPATH = os.path.join(TMPDIR, JOBJAR)
-
-def needs_rebuild():
-  return not os.path.exists(JOBJARPATH) or os.stat(JOBJARPATH).st_mtime < os.stat(JOBPATH).st_mtime
-
-def build_job_jar():
-  sys.stderr.write("compiling " + JOBFILE + "\n")
-  if os.path.exists(BUILDDIR):
-    shutil.rmtree(BUILDDIR)
-  os.makedirs(BUILDDIR)
-  cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, JARPATH, HADOOP_JARS, BUILDDIR, JOBFILE)
-  print cmd
-  if subprocess.call(cmd, shell=True):
-    shutil.rmtree(BUILDDIR)
-    sys.exit(1)
-
-  shutil.copy(JARPATH, JOBJARPATH)
-  jar_cmd = "jar uf %s -C %s ." % (JOBJARPATH, BUILDDIR)
-  subprocess.call(jar_cmd, shell=True)
-  shutil.rmtree(BUILDDIR)
-
-def hadoop_command():
-  return "%s/bin/hadoop jar %s %s %s" % (HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv))
-
-if is_file() and needs_rebuild():
-  build_job_jar()
-
-SHELL_COMMAND = hadoop_command()
-print SHELL_COMMAND
-os.system(SHELL_COMMAND)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/assembly/release.xml
----------------------------------------------------------------------
diff --git a/scrunch/src/main/assembly/release.xml b/scrunch/src/main/assembly/release.xml
new file mode 100644
index 0000000..6d305de
--- /dev/null
+++ b/scrunch/src/main/assembly/release.xml
@@ -0,0 +1,75 @@
+<!--
+   Assembly configuration for the release bundle.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>release</id>
+  <formats>
+    <format>dir</format>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>true</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <!-- readme -->
+      <useDefaultExcludes>false</useDefaultExcludes>
+      <outputDirectory>/</outputDirectory>
+      <fileMode>0644</fileMode>
+      <includes>
+        <include>README.md</include>
+      </includes>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <!-- scripts -->
+      <useDefaultExcludes>false</useDefaultExcludes>
+      <outputDirectory>bin</outputDirectory>
+      <directory>src/main/scripts</directory>
+      <fileMode>0755</fileMode>
+      <excludes>
+        <exclude>*~</exclude>
+        <exclude>*.swp</exclude>
+      </excludes>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <!-- conf dir -->
+      <useDefaultExcludes>false</useDefaultExcludes>
+      <outputDirectory>conf</outputDirectory>
+      <directory>src/main/conf</directory>
+      <fileMode>0644</fileMode>
+      <excludes>
+        <exclude>*~</exclude>
+        <exclude>*.swp</exclude>
+      </excludes>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <!-- examples dir -->
+      <useDefaultExcludes>false</useDefaultExcludes>
+      <outputDirectory>examples</outputDirectory>
+      <directory>src/main/examples</directory>
+      <fileMode>0644</fileMode>
+      <excludes>
+        <exclude>*~</exclude>
+        <exclude>*.swp</exclude>
+      </excludes>
+      <filtered>true</filtered>
+    </fileSet>
+  </fileSets>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>lib</outputDirectory>
+      <scope>runtime</scope>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <fileMode>0644</fileMode>
+      <!--
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-core</exclude>
+        <exclude>org.apache.hbase:hbase</exclude>
+        </excludes>
+      -->
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/scrunch/src/main/conf/log4j.properties b/scrunch/src/main/conf/log4j.properties
new file mode 100644
index 0000000..164ed56
--- /dev/null
+++ b/scrunch/src/main/conf/log4j.properties
@@ -0,0 +1,8 @@
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.scrunch=info, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/examples/ClassyPageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/ClassyPageRank.scala b/scrunch/src/main/examples/ClassyPageRank.scala
new file mode 100644
index 0000000..6c819a5
--- /dev/null
+++ b/scrunch/src/main/examples/ClassyPageRank.scala
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.scrunch._
+import org.apache.scrunch.Mem._
+
+case class UrlData(pageRank: Float, oldPageRank: Float, links: List[String]) {
+  def this() = this(1.0f, 0.0f, Nil)
+
+  def this(links: String*) = this(1.0f, 0.0f, List(links:_*))
+ 
+  def this(links: Iterable[String]) = this(1.0f, 0.0f, links.toList)
+  
+  def delta = math.abs(pageRank - oldPageRank)
+
+  def next(newPageRank: Float) = new UrlData(newPageRank, pageRank, links)
+
+  def outboundScores = links.map(link => (link, pageRank / links.size))
+}
+
+object ClassyPageRank extends PipelineApp {
+
+  def initialize(file: String) = {
+    read(from.textFile(file))
+      .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
+      .groupByKey
+      .map((url, links) => (url, new UrlData(links)))
+  }
+
+  def update(prev: PTable[String, UrlData], d: Float) = {
+    val outbound = prev.values.flatMap(_.outboundScores)
+
+    cogroup(prev, outbound).mapValues(data => {
+      val (prd, outboundScores) = data
+      val newPageRank = (1 - d) + d * outboundScores.sum
+      if (!prd.isEmpty) {
+        prd.head.next(newPageRank)
+      } else {
+        new UrlData(newPageRank, 0, Nil)
+      }
+    })
+  }
+
+  var index = 0
+  var delta = 10.0f
+  fs.mkdirs("prank/")
+  var curr = initialize(args(0))
+  while (delta > 1.0f) {
+    index = index + 1
+    curr = update(curr, 0.5f)
+    write(curr, to.avroFile("prank/" + index))
+    delta = curr.values.map(_.delta).max.materialize.head
+    println("Current delta = " + delta)
+  }
+  fs.rename("prank/" + index, args(1))
+  fs.delete("prank/", true)
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/examples/PageRank.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/PageRank.scala b/scrunch/src/main/examples/PageRank.scala
new file mode 100644
index 0000000..7de26e6
--- /dev/null
+++ b/scrunch/src/main/examples/PageRank.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.scrunch._
+
+object PageRank extends PipelineApp {
+  def initialize(file: String) = {
+    read(from.textFile(file))
+      .map(line => { val urls = line.split("\\s+"); (urls(0), urls(2)) })
+      .groupByKey
+      .map((url, links) => (url, (1f, 0f, links.toList)))
+  }
+
+  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
+    val outbound = prev.flatMap((url, data) => {
+      val (pagerank, old_pagerank, links) = data
+      links.map(link => (link, pagerank / links.size))
+    })
+
+    cogroup(prev, outbound).mapValues(data => {
+      val (prev_data, outbound_data) = data
+      val new_pagerank = (1 - d) + d * outbound_data.sum
+      var cur_pagerank = 0f
+      var links: List[String] = Nil
+      if (!prev_data.isEmpty) {
+        val (cur_pr, old_pr, l) = prev_data.head
+        cur_pagerank = cur_pr
+        links = l
+      }
+      (new_pagerank, cur_pagerank, links)
+    })
+  }
+
+  var index = 0
+  var delta = 10.0f
+  fs.mkdirs("prank/")
+  var curr = initialize(args(0))
+  while (delta > 1.0f) {
+    index = index + 1
+    curr = update(curr, 0.5f)
+    write(curr, to.avroFile("prank/" + index))
+    delta = curr.values.map(v => math.abs(v._1 - v._2)).max.materialize.head
+    println("Current delta = " + delta)
+  }
+  fs.rename("prank/" + index, args(1))
+  fs.delete("prank/", true)
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/examples/WordCount.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/examples/WordCount.scala b/scrunch/src/main/examples/WordCount.scala
new file mode 100644
index 0000000..10780e8
--- /dev/null
+++ b/scrunch/src/main/examples/WordCount.scala
@@ -0,0 +1,27 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+import org.apache.scrunch.PipelineApp
+
+object WordCount extends PipelineApp {
+
+  def countWords(file: String) = {
+    read(from.textFile(file))
+      .flatMap(_.split("\\W+").filter(!_.isEmpty()))
+      .count
+  }
+
+  val counts = join(countWords(args(0)), countWords(args(1)))
+  write(counts, to.textFile(args(2)))
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scala/org/apache/scrunch/IO.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/IO.scala b/scrunch/src/main/scala/org/apache/scrunch/IO.scala
index 4886e6a..9081d3f 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/IO.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/IO.scala
@@ -21,23 +21,30 @@ import org.apache.crunch.io.{From => from, To => to, At => at}
 import org.apache.crunch.types.avro.AvroType
 import org.apache.hadoop.fs.Path;
 
-object From {
+trait From {
   def avroFile[T](path: String, atype: AvroType[T]) = from.avroFile(path, atype)
   def avroFile[T](path: Path, atype: AvroType[T]) = from.avroFile(path, atype)
   def textFile(path: String) = from.textFile(path)
   def textFile(path: Path) = from.textFile(path)
 }
 
-object To {
+object From extends From
+
+trait To {
   def avroFile[T](path: String) = to.avroFile(path)
   def avroFile[T](path: Path) = to.avroFile(path)
   def textFile(path: String) = to.textFile(path)
   def textFile(path: Path) = to.textFile(path)
 }
 
-object At {
+object To extends To
+
+trait At {
   def avroFile[T](path: String, atype: AvroType[T]) = at.avroFile(path, atype)
   def avroFile[T](path: Path, atype: AvroType[T]) = at.avroFile(path, atype)
   def textFile(path: String) = at.textFile(path)
   def textFile(path: Path) = at.textFile(path)
 }
+
+object At extends At
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
index 04a2e91..d7ebee5 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
@@ -17,11 +17,13 @@
  */
 package org.apache.scrunch
 
+import scala.collection.JavaConversions
+
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
 import org.apache.crunch.lib.Aggregate
 import org.apache.scrunch.Conversions._
-import scala.collection.JavaConversions
+import org.apache.scrunch.interpreter.InterpreterRunner
 
 class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
   import PCollection._
@@ -45,20 +47,23 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
 
   def by[K: PTypeH](f: S => K): PTable[K, S] = {
     val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
-    parallelDo(mapKeyFn[S, K](f), ptype) 
+    parallelDo(mapKeyFn[S, K](f), ptype)
   }
 
   def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
     by(f).groupByKey
   }
-  
-  def materialize() = JavaConversions.iterableAsScalaIterable[S](native.materialize)
+
+  def materialize() = {
+    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
+    JavaConversions.iterableAsScalaIterable[S](native.materialize)
+  }
 
   def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
-  
+
   def count() = {
     val count = new PTable[S, java.lang.Long](Aggregate.count(native))
-    count.mapValues(_.longValue()) 
+    count.mapValues(_.longValue())
   }
 
   def max() = wrap(Aggregate.max(native))

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
index 5337929..984d9dc 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
@@ -17,11 +17,14 @@
  */
 package org.apache.scrunch
 
+import java.util.{Collection => JCollect}
+
+import scala.collection.JavaConversions._
+
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
 import org.apache.crunch.lib.{Join, Aggregate, Cogroup, PTables}
-import java.util.{Collection => JCollect}
-import scala.collection.JavaConversions._
+import org.apache.scrunch.interpreter.InterpreterRunner
 
 class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
   import PTable._
@@ -110,10 +113,11 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
   def wrap(newNative: AnyRef) = {
     new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
   }
- 
+
   def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
- 
+
   def materialize(): Iterable[(K, V)] = {
+    InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
     native.materialize.view.map(x => (x.first, x.second))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
index a642afc..fa13d3a 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
@@ -17,13 +17,16 @@
  */
 package org.apache.scrunch
 
-import java.lang.Class
+import java.io.File
 
 import org.apache.hadoop.conf.Configuration
+import org.slf4j.LoggerFactory
 
 import org.apache.crunch.{Pipeline => JPipeline}
 import org.apache.crunch.impl.mem.MemPipeline
 import org.apache.crunch.impl.mr.MRPipeline
+import org.apache.crunch.util.DistCache
+import org.apache.scrunch.interpreter.InterpreterRunner
 
 /**
  * Manages the state of a pipeline execution.
@@ -73,14 +76,33 @@ class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
  * Companion object. Contains subclasses of Pipeline.
  */
 object Pipeline {
+  val log = LoggerFactory.getLogger(classOf[Pipeline])
+
   /**
    * Pipeline for running jobs on a hadoop cluster.
    *
    * @param clazz Type of the class using the pipeline.
    * @param configuration Hadoop configuration to use.
    */
-  class MapReducePipeline (clazz: Class[_], configuration: Configuration)
-    extends Pipeline(new MRPipeline(clazz, configuration))
+  class MapReducePipeline (clazz: Class[_], configuration: Configuration) extends Pipeline(
+      {
+        // Attempt to add all jars in the Scrunch distribution lib directory to the job that will
+        // be run.
+        val jarPath = DistCache.findContainingJar(classOf[org.apache.scrunch.Pipeline])
+        if (jarPath != null) {
+          val scrunchJarFile = new File(jarPath)
+          DistCache.addJarDirToDistributedCache(configuration, scrunchJarFile.getParent())
+        } else {
+          log.warn("Could not locate Scrunch jar file, so could not add Scrunch jars to the " +
+              "job(s) about to be run.")
+        }
+        if (InterpreterRunner.repl == null) {
+          new MRPipeline(clazz, configuration)
+        } else {
+          // We're running in the REPL, so we'll use the crunch jar as the job jar.
+          new MRPipeline(classOf[org.apache.scrunch.Pipeline], configuration)
+        }
+      })
 
   /**
    * Pipeline for running jobs in memory.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
index 4062fdc..92dbaf3 100644
--- a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
+++ b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
@@ -23,6 +23,7 @@ import org.apache.crunch.{Pipeline => JPipeline}
 import org.apache.crunch.Source
 import org.apache.crunch.TableSource
 import org.apache.crunch.Target
+import org.apache.scrunch.interpreter.InterpreterRunner
 
 trait PipelineLike {
   def jpipeline: JPipeline
@@ -71,7 +72,10 @@ trait PipelineLike {
    * Constructs and executes a series of MapReduce jobs in order
    * to write data to the output targets.
    */
-  def run(): Unit = jpipeline.run()
+  def run(): Unit = {
+    InterpreterRunner.addReplJarsToJob(getConfiguration())
+    jpipeline.run()
+  }
 
   /**
    * Run any remaining jobs required to generate outputs and then

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala b/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
new file mode 100644
index 0000000..3da73d6
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch.interpreter
+
+import java.io.File
+import java.io.FileOutputStream
+import java.util.jar.JarEntry
+import java.util.jar.JarOutputStream
+
+import scala.tools.nsc.GenericRunnerCommand
+import scala.tools.nsc.Global
+import scala.tools.nsc.MainGenericRunner
+import scala.tools.nsc.ObjectRunner
+import scala.tools.nsc.Properties
+import scala.tools.nsc.ScriptRunner
+import scala.tools.nsc.interpreter.ILoop
+import scala.tools.nsc.io.Jar
+import scala.tools.nsc.io.VirtualDirectory
+
+import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.crunch.util.DistCache
+
+/**
+ * An object used to run a Scala REPL with modifications to facilitate Scrunch jobs running
+ * within the REPL.
+ */
+object InterpreterRunner extends MainGenericRunner {
+
+  // The actual Scala repl.
+  var repl: ILoop = null
+
+  /**
+   * Checks whether or not the Scala repl has been started.
+   *
+   * @return <code>true</code> if the repl is running, <code>false</code> otherwise.
+   */
+  def isReplRunning() = repl == null
+
+  /**
+   * The main entry point for the REPL.  This method is lifted from
+   * {@link scala.tools.nsc.MainGenericRunner} and modified to facilitate testing whether or not
+   * the REPL is actually running.
+   *
+   * @param args Arguments used on the command line to start the REPL.
+   * @return <code>true</code> if execution was successful, <code>false</code> otherwise.
+   */
+  override def process(args: Array[String]): Boolean = {
+    val command = new GenericRunnerCommand(args.toList, (x: String) => errorFn(x))
+    import command.{settings, howToRun, thingToRun}
+    // Defines a nested function to retrieve a sample compiler if necessary.
+    def sampleCompiler = new Global(settings)
+
+    import Properties.{versionString, copyrightString}
+    if (!command.ok) {
+      return errorFn("\n" + command.shortUsageMsg)
+    } else if (settings.version.value) {
+      return errorFn("Scala code runner %s -- %s".format(versionString, copyrightString))
+    } else if (command.shouldStopWithInfo) {
+      return errorFn(command getInfoMessage sampleCompiler)
+    }
+
+    // Functions to retrieve settings values that were passed to REPL invocation.
+    // The -e argument provides a Scala statement to execute.
+    // The -i option requests that a file be preloaded into the interactive shell.
+    def isE = !settings.execute.isDefault
+    def dashe = settings.execute.value
+    def isI = !settings.loadfiles.isDefault
+    def dashi = settings.loadfiles.value
+
+    // Function to retrieve code passed by -e and -i options to REPL.
+    def combinedCode = {
+      val files = if (isI) dashi map (file => scala.tools.nsc.io.File(file).slurp()) else Nil
+      val str = if (isE) List(dashe) else Nil
+      files ++ str mkString "\n\n"
+    }
+
+    import GenericRunnerCommand._
+
+    // Function for running the target command. It can run an object with main, a script, or
+    // an interactive REPL.
+    def runTarget(): Either[Throwable, Boolean] = howToRun match {
+      case AsObject =>
+        ObjectRunner.runAndCatch(settings.classpathURLs, thingToRun, command.arguments)
+      case AsScript =>
+        ScriptRunner.runScriptAndCatch(settings, thingToRun, command.arguments)
+      case AsJar =>
+        ObjectRunner.runAndCatch(
+          scala.tools.nsc.io.File(thingToRun).toURL +: settings.classpathURLs,
+          new Jar(thingToRun).mainClass getOrElse sys.error("Cannot find main class for jar: " +
+            thingToRun),
+          command.arguments
+        )
+      case Error =>
+        Right(false)
+      case _ =>
+        // We start the shell when no arguments are given.
+        repl = new ILoop
+        Right(repl.process(settings))
+    }
+
+    /**If -e and -i were both given, we want to execute the -e code after the
+     *  -i files have been included, so they are read into strings and prepended to
+     *  the code given in -e.  The -i option is documented to only make sense
+     *  interactively so this is a pretty reasonable assumption.
+     *
+     *  This all needs a rewrite though.
+     */
+    if (isE) {
+      ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
+    }
+    else runTarget() match {
+      case Left(ex) => errorFn(ex)
+      case Right(b) => b
+    }
+  }
+
+  def main(args: Array[String]) {
+    val retVal = process(args)
+    if (!retVal)
+      sys.exit(1)
+  }
+
+  /**
+   * Creates a jar file containing the code thus far compiled by the REPL in a temporary directory.
+   *
+   * @return A file object representing the jar file created.
+   */
+  def createReplCodeJar(): File = {
+    var jarStream: JarOutputStream = null
+    try {
+      val virtualDirectory = repl.virtualDirectory
+      val tempDir = Files.createTempDir()
+      val tempJar = new File(tempDir, "replJar.jar")
+      jarStream = new JarOutputStream(new FileOutputStream(tempJar))
+      addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
+      return tempJar
+    } finally {
+      IOUtils.closeQuietly(jarStream)
+    }
+  }
+
+  /**
+   * Add the contents of the specified virtual directory to a jar. This method will recursively
+   * descend into subdirectories to add their contents.
+   *
+   * @param dir The virtual directory whose contents should be added.
+   * @param entryPath The entry path for classes found in the virtual directory.
+   * @param jarStream An output stream for writing the jar file.
+   */
+  def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
+      JarOutputStream): Unit = {
+    dir.foreach { file =>
+      if (file.isDirectory) {
+        // Recursively descend into subdirectories, adjusting the package name as we do.
+        addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
+            entryPath + file.name + "/", jarStream)
+      } else if (file.hasExtension("class")) {
+        // Add class files as an entry in the jar file and write the class to the jar.
+        val entry: JarEntry = new JarEntry(entryPath + file.name)
+        jarStream.putNextEntry(entry)
+        jarStream.write(file.toByteArray)
+      }
+    }
+  }
+
+  /**
+   * Generates a jar containing the code thus far compiled by the REPL,
+   * and adds that jar file to the distributed cache of jobs using the specified configuration.
+   * Also adds any jars added with the :cp command to the user's job.
+   *
+   * @param configuration The configuration of jobs that should use the REPL code jar.
+   */
+  def addReplJarsToJob(configuration: Configuration): Unit = {
+    if (repl != null) {
+      // Generate a jar of REPL code and add to the distributed cache.
+      val replJarFile = createReplCodeJar()
+      DistCache.addJarToDistributedCache(configuration, replJarFile)
+      // Get the paths to jars added with the :cp command.
+      val addedJarPaths = repl.addedClasspath.split(':')
+      addedJarPaths.foreach {
+        path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scripts/imports.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/imports.scala b/scrunch/src/main/scripts/imports.scala
new file mode 100644
index 0000000..64d7149
--- /dev/null
+++ b/scrunch/src/main/scripts/imports.scala
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.scrunch._
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scripts/scrunch
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/scrunch b/scrunch/src/main/scripts/scrunch
new file mode 100755
index 0000000..c0a158f
--- /dev/null
+++ b/scrunch/src/main/scripts/scrunch
@@ -0,0 +1,163 @@
+#!/bin/bash --posix
+#
+##############################################################################
+# Copyright 2002-2011, LAMP/EPFL
+#
+# This is free software; see the distribution for copying conditions.
+# There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+##############################################################################
+
+# Identify the bin dir in the distribution from which this script is running.
+bin=`dirname $0`
+bin=`cd ${bin} && pwd`
+
+# Set the directory where libraries for scrunch shell live.
+SCRUNCH_LIB_DIR="${bin}/../lib"
+# Set the conf directory for the scrunch distribution.
+SCRUNCH_CONF_DIR="${bin}/../conf"
+# Set the main class used to run scrunch shell.
+MAIN_CLASS="org.apache.scrunch.interpreter.InterpreterRunner"
+
+# Not sure what the right default is here: trying nonzero.
+scala_exit_status=127
+saved_stty=""
+
+# restore stty settings (echo in particular)
+function restoreSttySettings() {
+  if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+    echo "restoring stty: $saved_stty"
+  fi
+
+  stty $saved_stty
+  saved_stty=""
+}
+
+function onExit() {
+  if [[ "$saved_stty" != "" ]]; then
+    restoreSttySettings
+    exit $scala_exit_status
+  fi
+}
+
+# to reenable echo if we are interrupted before completing.
+trap onExit INT
+
+# save terminal settings
+saved_stty=$(stty -g 2>/dev/null)
+# clear on error so we don't later try to restore them
+if [[ ! $? ]]; then
+  saved_stty=""
+fi
+if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+  echo "saved stty: $saved_stty"
+fi
+
+cygwin=false;
+case "`uname`" in
+    CYGWIN*) cygwin=true ;;
+esac
+
+# Constructing scrunch shell classpath.
+SCRUNCH_SHELL_CLASSPATH=""
+# Add files in conf dir.
+for ext in "$SCRUNCH_CONF_DIR"/* ; do
+    if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
+        SCRUNCH_SHELL_CLASSPATH="$ext"
+    else
+        SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
+    fi
+done
+# Add files in lib dir.
+for ext in "$SCRUNCH_LIB_DIR"/*.jar ; do
+    if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then
+        SCRUNCH_SHELL_CLASSPATH="$ext"
+    else
+        SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext"
+    fi
+done
+
+# Constructing Hadoop classpath.
+if [ -z "$HADOOP_HOME" ]; then
+    echo "HADOOP_HOME must be set to run the Scrunch shell."
+    exit 1
+fi
+HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+
+CYGWIN_JLINE_TERMINAL=
+if $cygwin; then
+    if [ "$OS" = "Windows_NT" ] && cygpath -m .>/dev/null 2>/dev/null ; then
+        format=mixed
+    else
+        format=windows
+    fi
+    SCRUNCH_SHELL_CLASSPATH=`cygpath --path --$format "$SCRUNCH_SHELL_CLASSPATH"`
+    case "$TERM" in
+        rxvt* | xterm*)
+            stty -icanon min 1 -echo
+            CYGWIN_JLINE_TERMINAL="-Djline.terminal=scala.tools.jline.UnixTerminal"
+        ;;
+    esac
+fi
+
+[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx256M -Xms32M"
+
+# break out -D and -J options and add them to JAVA_OPTS as well
+# so they reach the underlying JVM in time to do some good.  The
+# -D options will be available as system properties.
+declare -a java_args
+declare -a scala_args
+
+# Don't use the bootstrap classloader.
+CPSELECT="-classpath "
+
+while [ $# -gt 0 ]; do
+  case "$1" in
+    -D*)
+      # pass to scala as well: otherwise we lose it sometimes when we
+      # need it, e.g. communicating with a server compiler.
+      java_args=("${java_args[@]}" "$1")
+      scala_args=("${scala_args[@]}" "$1")
+      shift
+      ;;
+    -J*)
+      # as with -D, pass to scala even though it will almost
+      # never be used.
+      java_args=("${java_args[@]}" "${1:2}")
+      scala_args=("${scala_args[@]}" "$1")
+      shift
+      ;;
+    -toolcp)
+      TOOL_CLASSPATH="$TOOL_CLASSPATH:$2"
+      shift 2
+      ;;
+    *)
+      scala_args=("${scala_args[@]}" "$1")
+      shift
+      ;;
+  esac
+done
+
+# reset "$@" to the remaining args
+set -- "${scala_args[@]}"
+
+if [ -z "$JAVACMD" -a -n "$JAVA_HOME" -a -x "$JAVA_HOME/bin/java" ]; then
+    JAVACMD="$JAVA_HOME/bin/java"
+fi
+
+"${JAVACMD:=java}" \
+  $JAVA_OPTS \
+  "${java_args[@]}" \
+  ${CPSELECT}${TOOL_CLASSPATH}":"${HADOOP_CLASSPATH}":"${SCRUNCH_SHELL_CLASSPATH} \
+  -Dscala.usejavacp=true \
+  -Denv.emacs="$EMACS" \
+  $CYGWIN_JLINE_TERMINAL \
+  $MAIN_CLASS  "$@" \
+  -i ${bin}/imports.scala \
+  -Yrepl-sync
+# The -Yrepl-sync option is a fix for the 2.9.1 REPL. This should probably not be necessary in the future.
+
+# record the exit status lest it be overwritten:
+# then reenable echo and propagate the code.
+scala_exit_status=$?
+onExit

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/main/scripts/scrunch-job.py
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scripts/scrunch-job.py b/scrunch/src/main/scripts/scrunch-job.py
new file mode 100755
index 0000000..d50434c
--- /dev/null
+++ b/scrunch/src/main/scripts/scrunch-job.py
@@ -0,0 +1,118 @@
+#!/usr/bin/python
+
+import glob
+import os
+import re
+import shutil
+import subprocess
+import sys
+
+# Configuration in script
+##############################################################
+if not "SCALA_HOME" in os.environ:
+  sys.stderr.write("Environment variable SCALA_HOME must be set\n")
+  sys.exit(1)
+SCALA_LIB = os.path.join(os.environ["SCALA_HOME"], "lib")
+
+if not "HADOOP_HOME" in os.environ:
+  sys.stderr.write("Environment variable HADOOP_HOME must be set\n")
+  sys.exit(1)
+HADOOP_HOME = os.environ["HADOOP_HOME"]
+HADOOP_JARS = ":".join(glob.glob(os.path.join(HADOOP_HOME, "*.jar")))
+
+#Get the absolute path of the original (non-symlink) file.
+if os.path.islink(__file__):
+  ORIGINAL_FILE = os.readlink(__file__)
+else:
+  ORIGINAL_FILE = __file__
+
+DIST_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../")
+LIB_DIR = DIST_ROOT + "/lib" # Dir with all scrunch dependencies.
+TMPDIR = "/tmp"
+BUILDDIR = TMPDIR + "/script-build"
+COMPILE_CMD = "java -cp %s/scala-library.jar:%s/scala-compiler.jar -Dscala.home=%s scala.tools.nsc.Main" % (SCALA_LIB, SCALA_LIB, SCALA_LIB)
+##############################################################
+
+argv = sys.argv[1:]
+if len(argv) < 1:
+  sys.stderr.write("ERROR: insufficient args.\n")
+  sys.exit(1)
+
+JOBFILE = argv.pop(0)
+
+def file_type():
+  m = re.search(r'\.(scala|java)$', JOBFILE)
+  if m:
+    return m.group(1)
+  return None
+
+def is_file():
+  return file_type() is not None
+
+PACK_RE = r'package ([^;]+)'
+OBJECT_RE = r'object\s+([^\s(]+).*(extends|with)\s+PipelineApp.*'
+EXTENSION_RE = r'(.*)\.(scala|java)$'
+
+#Get the name of the job from the file.
+#the rule is: last class in the file, or the one that matches the filename
+def get_job_name(file):
+  package = ""
+  job = None
+  default = None
+  match = re.search(EXTENSION_RE, file)
+  if match:
+    default = match.group(1)
+    for s in open(file, "r"):
+      mp = re.search(PACK_RE, s)
+      mo = re.search(OBJECT_RE, s)
+      if mp:
+        package = mp.group(1).trim() + "."
+      elif mo:
+        if not job or not default or not job.tolower() == default.tolower():
+          #use either the last class, or the one with the same name as the file
+          job = mo.group(1)
+    if not job:
+      raise "Could not find job name"
+    return "%s%s" % (package, job)
+  else:
+    return file
+
+LIB_PATH = os.path.abspath(LIB_DIR)
+if not os.path.exists(LIB_PATH):
+  sys.stderr.write("Scrunch distribution lib directory not found; run mvn package to construct a distribution to run examples from.\n")
+  sys.exit(1)
+LIB_JARS = glob.glob(os.path.join(LIB_PATH, "*.jar"))
+LIB_CP = ":".join(LIB_JARS)
+
+JOBPATH = os.path.abspath(JOBFILE)
+JOB = get_job_name(JOBFILE)
+JOBJAR = JOB + ".jar"
+JOBJARPATH = os.path.join(TMPDIR, JOBJAR)
+
+def needs_rebuild():
+  return not os.path.exists(JOBJARPATH) or os.stat(JOBJARPATH).st_mtime < os.stat(JOBPATH).st_mtime
+
+def build_job_jar():
+  sys.stderr.write("compiling " + JOBFILE + "\n")
+  if os.path.exists(BUILDDIR):
+    shutil.rmtree(BUILDDIR)
+  os.makedirs(BUILDDIR)
+  cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, LIB_CP, HADOOP_JARS, BUILDDIR, JOBFILE)
+  print cmd
+  if subprocess.call(cmd, shell=True):
+    shutil.rmtree(BUILDDIR)
+    sys.exit(1)
+
+  jar_cmd = "jar cf %s -C %s ." % (JOBJARPATH, BUILDDIR)
+  subprocess.call(jar_cmd, shell=True)
+  shutil.rmtree(BUILDDIR)
+
+def hadoop_command():
+  return "HADOOP_CLASSPATH=%s ; %s/bin/hadoop jar %s %s %s" % (LIB_CP, HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv))
+
+if is_file() and needs_rebuild():
+  build_job_jar()
+
+SHELL_COMMAND = hadoop_command()
+print SHELL_COMMAND
+os.system(SHELL_COMMAND)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/scrunch/src/test/resources/log4j.properties b/scrunch/src/test/resources/log4j.properties
new file mode 100644
index 0000000..985a2eb
--- /dev/null
+++ b/scrunch/src/test/resources/log4j.properties
@@ -0,0 +1,13 @@
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+log4j.logger.org.apache.scrunch=info, A
+
+# Log warnings on Hadoop for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/scrunch/src/test/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala b/scrunch/src/test/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
new file mode 100644
index 0000000..5d38027
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/interpreter/InterpreterJarTest.scala
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch.interpreter
+
+import java.io.File
+import java.io.FileOutputStream
+import java.util.jar.JarFile
+import java.util.jar.JarOutputStream
+
+import scala.tools.nsc.io.VirtualDirectory
+
+import com.google.common.io.Files
+import org.junit.Assert.assertNotNull
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
+ */
+class InterpreterJarTest extends JUnitSuite {
+
+  /**
+   * Tests transforming a virtual directory into a temporary jar file.
+   */
+  @Test def virtualDirToJar: Unit = {
+    // Create a virtual directory and populate with some mock content.
+    val root = new VirtualDirectory("testDir", None)
+    // Add some subdirectories to the root.
+    (1 to 10).foreach { i =>
+      val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
+      // Add some classfiles to each sub directory.
+      (1 to 10).foreach { j =>
+        subdir.fileNamed("MyClass" + j + ".class")
+      }
+    }
+
+    // Now generate a jar file from the virtual directory.
+    val tempDir = Files.createTempDir()
+    tempDir.deleteOnExit()
+    val tempJar = new File(tempDir, "replJar.jar")
+    val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
+    InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
+    jarStream.close()
+
+    // Verify the contents of the jar.
+    val jarFile = new JarFile(tempJar)
+    (1 to 10).foreach { i =>
+      (1 to 10).foreach { j =>
+        val entryName = "top/pack/name/subdir" + i + "/MyClass" + j + ".class"
+        val entry = jarFile.getEntry(entryName)
+        assertNotNull("Jar entry " + entryName + " not found in generated jar.", entry)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 165641a..718ce5e 100644
--- a/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -64,7 +64,7 @@ public class JobPrototype {
   private final Set<JobPrototype> dependencies = Sets.newHashSet();
   private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
   private final Path workingPath;
-  
+
   private HashMultimap<Target, NodePath> targetsToNodePaths;
   private DoTableImpl<?,?> combineFnTable;
 
@@ -112,7 +112,7 @@ public class JobPrototype {
     conf = job.getConfiguration();
     conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
     job.setJarByClass(jarClass);
-    
+
     Set<DoNode> outputNodes = Sets.newHashSet();
     Set<Target> targets = targetsToNodePaths.keySet();
     Path outputPath = new Path(workingPath, "output");
@@ -151,7 +151,7 @@ public class JobPrototype {
 
       group.configureShuffle(job);
 
-      DoNode mapOutputNode = group.getGroupingNode();      
+      DoNode mapOutputNode = group.getGroupingNode();
       Set<DoNode> mapNodes = Sets.newHashSet();
       for (NodePath nodePath : mapNodePaths) {
         // Advance these one step, since we've already configured
@@ -179,7 +179,7 @@ public class JobPrototype {
       job.setInputFormatClass(CrunchInputFormat.class);
     }
     job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
-    
+
     return new CrunchJob(job, outputPath, outputHandler);
   }
 
@@ -201,7 +201,7 @@ public class JobPrototype {
     }
     return builder.build();
   }
-  
+
   private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
     while (iter.hasNext()) {
       PCollectionImpl<?> collect = iter.next();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/DistCache.java b/src/main/java/org/apache/crunch/util/DistCache.java
index 06ad22a..682e8f0 100644
--- a/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/src/main/java/org/apache/crunch/util/DistCache.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.URI;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.Enumeration;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -121,6 +124,40 @@ public class DistCache {
   }
 
   /**
+   * Finds the path to a jar that contains the class provided, if any. There is no guarantee that
+   * the jar returned will be the first on the classpath to contain the file. This method is
+   * basically lifted out of Hadoop's {@link org.apache.hadoop.mapred.JobConf} class.
+   *
+   * @param jarClass The class the jar file should contain.
+   * @return The path to a jar file that contains the class, or <code>null</code> if no such jar
+   *     exists.
+   * @throws IOException If there is a problem searching for the jar file.
+   */
+  public static String findContainingJar(Class jarClass) throws IOException {
+    ClassLoader loader = jarClass.getClassLoader();
+    String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
+      for(Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
+        URL url = (URL) itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    return null;
+  }
+
+  /**
    * Adds all jars under the specified directory to the distributed cache of jobs using the
    * provided configuration. The jars will be placed on the classpath of tasks run by the job.
    * This method does not descend into subdirectories when adding jars.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1ed57904/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
index c8173d7..f10f0c9 100644
--- a/src/test/resources/log4j.properties
+++ b/src/test/resources/log4j.properties
@@ -9,3 +9,4 @@ log4j.appender.A=org.apache.log4j.ConsoleAppender
 # ***** A uses PatternLayout.
 log4j.appender.A.layout=org.apache.log4j.PatternLayout
 log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+


Mime
View raw message