crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-496: Scrunch on Spark.
Date Wed, 11 Feb 2015 17:50:54 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 5e6e33536 -> dbd56e638


CRUNCH-496: Scrunch on Spark.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dbd56e63
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dbd56e63
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dbd56e63

Branch: refs/heads/master
Commit: dbd56e638456437451e8bc12bb0dfe55aef0f254
Parents: 5e6e335
Author: Josh Wills <jwills@apache.org>
Authored: Fri Jan 30 18:42:49 2015 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Wed Feb 11 09:24:19 2015 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/types/avro/AvroType.java  |   3 +-
 crunch-spark/pom.xml                            |  92 ++++++++++++-
 .../crunch/scrunch/spark/CrunchSparkSuite.scala |  39 ++++++
 .../scrunch/spark/PageRankClassTest.scala       | 118 +++++++++++++++++
 .../crunch/scrunch/spark/PageRankTest.scala     |  66 ++++++++++
 .../scrunch/spark/ByteBufferInputStream.scala   |  76 +++++++++++
 .../scrunch/spark/ScrunchSerializer.scala       | 129 +++++++++++++++++++
 .../crunch/scrunch/spark/SparkPipeline.scala    |  91 +++++++++++++
 pom.xml                                         |   2 +-
 9 files changed, 610 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 9dbf6b0..528a600 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -45,7 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -85,7 +84,7 @@ public class AvroType<T> implements PType<T> {
     this.baseInputMapFn = inputMapFn;
     this.baseOutputMapFn = outputMapFn;
     this.deepCopier = deepCopier;
-    this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
+    this.subTypes = Lists.newArrayList(ptypes);
     this.recordType = recordType;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml
index 6ab5db5..47f52b6 100644
--- a/crunch-spark/pom.xml
+++ b/crunch-spark/pom.xml
@@ -43,6 +43,10 @@ under the License.
       <artifactId>crunch-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-scrunch</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <scope>provided</scope>
@@ -78,6 +82,10 @@ under the License.
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.base.version}</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
@@ -87,16 +95,94 @@ under the License.
   <build>
     <plugins>
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>testCompile</goal>
+            </goals>
+            <configuration>
+              <args>
+                <arg>-deprecation</arg>
+                <arg>-dependencyfile</arg>
+                <arg>${project.build.directory}/.scala_dependencies</arg>
+              </args>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-failsafe-plugin</artifactId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <useFile>false</useFile>
+          <disableXmlReport>true</disableXmlReport>
+          <includes>
+            <include>${project.build.testSourceDirectory}/**/*Test.*</include>
+            <include>${project.build.testSourceDirectory}/**/*Suite.*</include>
+          </includes>
+        </configuration>
       </plugin>
+      <!-- We put slow-running tests into src/it and run them during the
+           integration-test phase using the failsafe plugin. This way
+           developers can run unit tests conveniently from the IDE or via
+           "mvn package" from the command line without triggering time
+           consuming integration tests. -->
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-test-source</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>add-test-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${basedir}/src/it/java</source>
+                <source>${basedir}/src/it/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
+            <id>add-test-resource</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>add-test-resource</goal>
+            </goals>
+            <configuration>
+              <resources>
+                  <resource>
+                    <directory>${basedir}/src/it/resources</directory>
+                  </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <testSourceDirectory>${basedir}/src/it/scala</testSourceDirectory>
+          <useFile>false</useFile>
+          <disableXmlReport>true</disableXmlReport>
+          <includes>
+            <include>**/*Test.*</include>
+            <include>**/*Suite.*</include>
+          </includes>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
     </plugins>
   </build>

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala
b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala
new file mode 100644
index 0000000..bb66911
--- /dev/null
+++ b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch.spark
+
+import org.apache.crunch.test.TemporaryPath
+import org.junit.{After, Before}
+import org.scalatest.junit.JUnitSuite
+
+class CrunchSparkSuite extends JUnitSuite {
+
+  val tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir");
+
+  def getFolder() = {
+    tempDir
+  }
+
+  @Before def initialize() {
+    tempDir.create()
+  }
+
+  @After def cleanup() {
+    tempDir.delete()
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala
b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala
new file mode 100644
index 0000000..2084004
--- /dev/null
+++ b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch.spark
+
+import org.apache.crunch.scrunch._
+import org.apache.crunch.scrunch.Avros._
+
+import org.apache.crunch.{DoFn, Emitter, Pair => P}
+import org.apache.crunch.io.{From => from}
+
+import scala.collection.mutable.HashMap
+
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+class PageRankData(val page_rank: Float, oldpr: Float, val urls: Array[String], bytes: Array[Byte])
{
+
+  // Required no-arg constructor for Avro reflection
+  def this() = this(0.0f, 0.0f, null, null)
+
+  def scaledPageRank = page_rank / urls.length
+
+  def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls, bytes)
+
+  def delta = math.abs(page_rank - oldpr)
+}
+
+class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] {
+  val cache = new HashMap[String, Float] {
+    override def default(key: String) = 0f
+  }
+
+  override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]])
{
+    val prd = input.second()
+    if (prd.urls.length > 0) {
+      val newpr = prd.page_rank / prd.urls.length
+      prd.urls.foreach(url => cache.put(url, cache(url) + newpr))
+      if (cache.size > 5000) {
+        cleanup(emitFn)
+      }
+    }
+  }
+
+  override def cleanup(emitFn: Emitter[P[String, Float]]) {
+    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
+    cache.clear
+  }
+}
+
+class PageRankClassTest extends CrunchSparkSuite {
+
+  lazy val pipeline = SparkPipeline[PageRankClassTest](tempDir.getDefaultConfiguration)
+
+  def initialInput(fileName: String) = {
+    pipeline.read(from.textFile(fileName, Avros.strings))
+      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
+      .groupByKey
+      .map((url, links) => (url, new PageRankData(1f, 0f, links.filter(x => x != null).toArray,
Array[Byte](0))))
+  }
+
+  def update(prev: PTable[String, PageRankData], d: Array[Float]) = {
+    val outbound = prev.flatMap((url, prd) => {
+      prd.urls.map(link => (link, prd.scaledPageRank))
+    })
+    cg(prev, outbound, d)
+  }
+
+  def cg(prev: PTable[String, PageRankData],
+         out: PTable[String, Float], d: Array[Float]) = {
+    prev.cogroup(out).map((url, v) => {
+      val (p, o) = v
+      val prd = p.head
+      (url, prd.next((1 - d(0)) + d(0) * o.sum))
+    })
+  }
+
+  def fastUpdate(prev: PTable[String, PageRankData], d: Array[Float]) = {
+    val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats))
+    cg(prev, outbound, d)
+  }
+
+  @Test def testPageRank {
+    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = update(prev, Array(0.5f))
+      delta = prev.values.map(_.delta).max.value()
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+
+  @Test def testFastPageRank {
+    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = fastUpdate(prev, Array(0.5f))
+      delta = prev.values.map(_.delta).max.value()
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala
b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala
new file mode 100644
index 0000000..7f9b7f9
--- /dev/null
+++ b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch.spark
+
+import org.apache.crunch.scrunch.{PTable, Avros}
+import org.apache.crunch.io.{From => from}
+
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+class PageRankTest extends CrunchSparkSuite {
+  lazy val pipeline = SparkPipeline[PageRankTest](tempDir.getDefaultConfiguration)
+
+  def initialInput(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+      .withPType(Avros.strings)
+      .mapWithContext((line, ctxt) => {
+        ctxt.getConfiguration; val urls = line.split("\\t"); (urls(0), urls(1))
+      })
+      .groupByKey
+      .map((url, links) => (url, (1.0, 0.0, links.toList)))
+  }
+
+  def update(prev: PTable[String, (Double, Double, List[String])], d: Double) = {
+    val outbound = prev.flatMap((url, v) => {
+      val (pr, oldpr, links) = v
+      links.map(link => (link, pr / links.size))
+    })
+    cg(prev, outbound, d)
+  }
+
+  def cg(prev: PTable[String, (Double, Double, List[String])],
+         out: PTable[String, Double], d: Double) = {
+    prev.cogroup(out).map((url, v) => {
+      val (p, o) = v
+      val (pr, oldpr, links) = p.head
+      (url, ((1 - d) + d * o.sum, pr, links))
+    })
+  }
+
+  @Test def testPageRank {
+    var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
+    var delta = 1.0
+    while (delta > 0.01) {
+      prev = update(prev, 0.5)
+      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.value()
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala
b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala
new file mode 100644
index 0000000..cc11b46
--- /dev/null
+++ b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.scrunch.spark
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+/**
+ * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
+ * at the end of the stream (e.g. to close a memory-mapped file).
+ */
+private[spark]
+class ByteBufferInputStream(private var buffer: ByteBuffer)
+  extends InputStream {
+
+  override def read(): Int = {
+    if (buffer == null || buffer.remaining() == 0) {
+      cleanUp()
+      -1
+    } else {
+      buffer.get() & 0xFF
+    }
+  }
+
+  override def read(dest: Array[Byte]): Int = {
+    read(dest, 0, dest.length)
+  }
+
+  override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
+    if (buffer == null || buffer.remaining() == 0) {
+      cleanUp()
+      -1
+    } else {
+      val amountToGet = math.min(buffer.remaining(), length)
+      buffer.get(dest, offset, amountToGet)
+      amountToGet
+    }
+  }
+
+  override def skip(bytes: Long): Long = {
+    if (buffer != null) {
+      val amountToSkip = math.min(bytes, buffer.remaining).toInt
+      buffer.position(buffer.position + amountToSkip)
+      if (buffer.remaining() == 0) {
+        cleanUp()
+      }
+      amountToSkip
+    } else {
+      0L
+    }
+  }
+
+  /**
+   * Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
+   */
+  private def cleanUp() {
+    if (buffer != null) {
+      buffer = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala
b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala
new file mode 100644
index 0000000..b3934ed
--- /dev/null
+++ b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch.spark
+
+import java.io._
+import java.nio.ByteBuffer
+import java.util
+
+import org.apache.spark.serializer.{Serializer, DeserializationStream, SerializationStream,
SerializerInstance}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkConf
+
+private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int)
+  extends SerializationStream {
+  private val objOut = new ObjectOutputStream(out)
+  private var counter = 0
+
+  /**
+   * Calling reset to avoid memory leak:
+   * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+   * But only call it every 100th time to avoid bloated serialization streams (when
+   * the stream 'resets' object class descriptions have to be re-written)
+   */
+  def writeObject[T: ClassTag](t: T): SerializationStream = {
+    objOut.writeObject(t)
+    counter += 1
+    if (counterReset > 0 && counter >= counterReset) {
+      objOut.reset()
+      counter = 0
+    }
+    this
+  }
+
+  def flush() { objOut.flush() }
+  def close() { objOut.close() }
+}
+
+private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader,
+                                                primitiveMapping: util.HashMap[String, Class[_]])
+  extends DeserializationStream {
+  private val objIn = new ObjectInputStream(in) {
+    override def resolveClass(desc: ObjectStreamClass): Class[_] = {
+      val name = desc.getName
+      if (primitiveMapping.containsKey(name)) {
+        return primitiveMapping.get(name)
+      }
+      Class.forName(desc.getName, false, loader)
+    }
+  }
+
+  def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
+  def close() { objIn.close() }
+}
+
+
+private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader,
+                                            primitiveMappings: util.HashMap[String, Class[_]])
+  extends SerializerInstance {
+
+  override def serialize[T: ClassTag](t: T): ByteBuffer = {
+    val bos = new ByteArrayOutputStream()
+    val out = serializeStream(bos)
+    out.writeObject(t)
+    out.close()
+    ByteBuffer.wrap(bos.toByteArray)
+  }
+
+  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
+    val bis = new ByteBufferInputStream(bytes)
+    val in = deserializeStream(bis)
+    in.readObject()
+  }
+
+  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
+    val bis = new ByteBufferInputStream(bytes)
+    val in = deserializeStream(bis, loader)
+    in.readObject()
+  }
+
+  override def serializeStream(s: OutputStream): SerializationStream = {
+    new JavaSerializationStream(s, counterReset)
+  }
+
+  override def deserializeStream(s: InputStream): DeserializationStream = {
+    new JavaDeserializationStream(s, defaultClassLoader, primitiveMappings)
+  }
+
+  def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
+    new JavaDeserializationStream(s, loader, primitiveMappings)
+  }
+}
+
+class ScrunchSerializer(conf: SparkConf) extends Serializer with Externalizable {
+  private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
+
+  override def newInstance(): SerializerInstance = {
+    val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
+    val classes = Array[Class[_]](classOf[Byte], classOf[Char], classOf[Short], classOf[Int],
+        classOf[Long], classOf[Float], classOf[Double], classOf[Boolean])
+    val mapping = new util.HashMap[String, Class[_]]()
+    classes.foreach(cls => {mapping.put(cls.getCanonicalName, cls)})
+    new JavaSerializerInstance(counterReset, classLoader, mapping)
+  }
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeInt(counterReset)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    counterReset = in.readInt()
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala
b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala
new file mode 100644
index 0000000..d935699
--- /dev/null
+++ b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch.spark
+
+import org.apache.crunch.scrunch.Pipeline
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkConf
+
+import scala.reflect.ClassTag
+
+/**
+ * A Scrunch {@code Pipeline} instance that wraps an underlying
+ * {@link org.apache.crunch.impl.spark.SparkPipeline} for executing
+ * pipelines on Spark.
+ */
+class SparkPipeline(val master: String, val name: String, val clazz: Class[_],
+                    val conf: Configuration) extends Pipeline({
+  new org.apache.crunch.impl.spark.SparkPipeline(master, name, clazz, conf)
+})
+
+/**
+ * Companion object for creating {@code SparkPipeline} instances.
+ */
+object SparkPipeline {
+
+  /**
+   * Default factory method for creating SparkPipeline instances.
+   *
+   * @tparam T The class to use for finding the right client JAR
+   * @return A new SparkPipeline instance
+   */
+  def apply[T: ClassTag](): SparkPipeline = apply[T](new Configuration())
+
+  /**
+   * Factory method that gets the name of the app and the Spark master
+   * from the {@code spark.app.name} and {@code spark.master} properties.
+   *
+   * @param conf The Configuration instance to use
+   * @tparam T The class to use for finding the right client JAR
+   * @return A new SparkPipeline instance
+   */
+  def apply[T: ClassTag](conf: Configuration): SparkPipeline = {
+    val sconf = new SparkConf()
+    val name = conf.get("spark.app.name", sconf.get("spark.app.name", "ScrunchApp"))
+    apply[T](name, conf)
+  }
+
+  /**
+   * Factory method for SparkPipeline that gets the Spark master from the
+   * {@code spark.master} property.
+   *
+   * @param name The name of the pipeline instance
+   * @param conf A Configuration instance
+   * @tparam T The class to use for finding the right client JAR
+   * @return A new SparkPipeline
+   */
+  def apply[T: ClassTag](name: String, conf: Configuration): SparkPipeline = {
+    val sconf = new SparkConf()
+    val master = conf.get("spark.master", sconf.get("spark.master", "local"))
+    apply[T](master, name, conf)
+  }
+
+  /**
+   * Factory method for SparkPipeline.
+   *
+   * @param master The URL or code for the Spark master to use.
+   * @param name The name of the pipeline instance
+   * @param conf A Configuration instance
+   * @tparam T The class to use for finding the right client JAR
+   * @return A new SparkPipeline
+   */
+  def apply[T: ClassTag](master: String, name: String, conf: Configuration): SparkPipeline
= {
+    conf.set("spark.closure.serializer", "org.apache.crunch.scrunch.spark.ScrunchSerializer")
+    new SparkPipeline(master, name, implicitly[ClassTag[T]].runtimeClass, conf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 519d65b..44fb488 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,7 +97,7 @@ under the License.
     <scala.base.version>2.10</scala.base.version>
     <scala.version>2.10.4</scala.version>
     <scalatest.version>1.9.1</scalatest.version>
-    <spark.version>1.0.0</spark.version>
+    <spark.version>1.2.0</spark.version>
     <jsr305.version>1.3.9</jsr305.version>
   </properties>
 


Mime
View raw message