Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EADED17AFF for ; Wed, 11 Feb 2015 17:50:54 +0000 (UTC) Received: (qmail 9909 invoked by uid 500); 11 Feb 2015 17:50:55 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 9878 invoked by uid 500); 11 Feb 2015 17:50:54 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 9869 invoked by uid 99); 11 Feb 2015 17:50:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2015 17:50:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B52FDE01CB; Wed, 11 Feb 2015 17:50:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <332b5bbce3b84779883ee349c02a5afc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: crunch git commit: CRUNCH-496: Scrunch on Spark. Date: Wed, 11 Feb 2015 17:50:54 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master 5e6e33536 -> dbd56e638 CRUNCH-496: Scrunch on Spark. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dbd56e63 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dbd56e63 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dbd56e63 Branch: refs/heads/master Commit: dbd56e638456437451e8bc12bb0dfe55aef0f254 Parents: 5e6e335 Author: Josh Wills Authored: Fri Jan 30 18:42:49 2015 -0800 Committer: Josh Wills Committed: Wed Feb 11 09:24:19 2015 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/types/avro/AvroType.java | 3 +- crunch-spark/pom.xml | 92 ++++++++++++- .../crunch/scrunch/spark/CrunchSparkSuite.scala | 39 ++++++ .../scrunch/spark/PageRankClassTest.scala | 118 +++++++++++++++++ .../crunch/scrunch/spark/PageRankTest.scala | 66 ++++++++++ .../scrunch/spark/ByteBufferInputStream.scala | 76 +++++++++++ .../scrunch/spark/ScrunchSerializer.scala | 129 +++++++++++++++++++ .../crunch/scrunch/spark/SparkPipeline.scala | 91 +++++++++++++ pom.xml | 2 +- 9 files changed, 610 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java index 9dbf6b0..528a600 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -45,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** @@ -85,7 +84,7 @@ public class AvroType implements PType { this.baseInputMapFn = inputMapFn; this.baseOutputMapFn = outputMapFn; this.deepCopier = deepCopier; - this.subTypes = ImmutableList. builder().add(ptypes).build(); + this.subTypes = Lists.newArrayList(ptypes); this.recordType = recordType; } http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml index 6ab5db5..47f52b6 100644 --- a/crunch-spark/pom.xml +++ b/crunch-spark/pom.xml @@ -43,6 +43,10 @@ under the License. crunch-core + org.apache.crunch + crunch-scrunch + + org.apache.hadoop hadoop-client provided @@ -78,6 +82,10 @@ under the License. test + org.scalatest + scalatest_${scala.base.version} + + org.hamcrest hamcrest-all test @@ -87,16 +95,94 @@ under the License. - org.apache.maven.plugins - maven-surefire-plugin + net.alchim31.maven + scala-maven-plugin + + + + compile + testCompile + + + + -deprecation + -dependencyfile + ${project.build.directory}/.scala_dependencies + + + + org.apache.maven.plugins - maven-failsafe-plugin + maven-surefire-plugin + + false + true + + ${project.build.testSourceDirectory}/**/*Test.* + ${project.build.testSourceDirectory}/**/*Suite.* + + + org.codehaus.mojo build-helper-maven-plugin + + + add-test-source + validate + + add-test-source + + + + ${basedir}/src/it/java + ${basedir}/src/it/scala + + + + + add-test-resource + validate + + add-test-resource + + + + + ${basedir}/src/it/resources + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + ${basedir}/src/it/scala + false + true + + **/*Test.* + **/*Suite.* + + + + + + integration-test + verify + + + http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala new file mode 100644 index 0000000..bb66911 --- /dev/null +++ b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/CrunchSparkSuite.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch.spark + +import org.apache.crunch.test.TemporaryPath +import org.junit.{After, Before} +import org.scalatest.junit.JUnitSuite + +class CrunchSparkSuite extends JUnitSuite { + + val tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir"); + + def getFolder() = { + tempDir + } + + @Before def initialize() { + tempDir.create() + } + + @After def cleanup() { + tempDir.delete() + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala new file mode 100644 index 0000000..2084004 --- /dev/null +++ b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankClassTest.scala @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch.spark + +import org.apache.crunch.scrunch._ +import org.apache.crunch.scrunch.Avros._ + +import org.apache.crunch.{DoFn, Emitter, Pair => P} +import org.apache.crunch.io.{From => from} + +import scala.collection.mutable.HashMap + +import _root_.org.junit.Assert._ +import _root_.org.junit.Test + +class PageRankData(val page_rank: Float, oldpr: Float, val urls: Array[String], bytes: Array[Byte]) { + + // Required no-arg constructor for Avro reflection + def this() = this(0.0f, 0.0f, null, null) + + def scaledPageRank = page_rank / urls.length + + def next(newPageRank: Float) = new PageRankData(newPageRank, page_rank, urls, bytes) + + def delta = math.abs(page_rank - oldpr) +} + +class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] { + val cache = new HashMap[String, Float] { + override def default(key: String) = 0f + } + + override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]]) { + val prd = input.second() + if (prd.urls.length > 0) { + val newpr = prd.page_rank / prd.urls.length + prd.urls.foreach(url => cache.put(url, cache(url) + newpr)) + if (cache.size > 5000) { + cleanup(emitFn) + } + } + } + + override def cleanup(emitFn: Emitter[P[String, Float]]) { + cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2))) + cache.clear + } +} + +class PageRankClassTest extends CrunchSparkSuite { + + lazy val pipeline = SparkPipeline[PageRankClassTest](tempDir.getDefaultConfiguration) + + def initialInput(fileName: String) = { + pipeline.read(from.textFile(fileName, Avros.strings)) + .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) }) + .groupByKey + .map((url, links) => (url, new PageRankData(1f, 0f, links.filter(x => x != null).toArray, Array[Byte](0)))) + } + + def update(prev: PTable[String, PageRankData], d: Array[Float]) = { + val outbound = prev.flatMap((url, prd) => { + prd.urls.map(link => (link, prd.scaledPageRank)) + }) + cg(prev, outbound, d) + } + + def cg(prev: PTable[String, PageRankData], + out: PTable[String, Float], d: Array[Float]) = { + prev.cogroup(out).map((url, v) => { + val (p, o) = v + val prd = p.head + (url, prd.next((1 - d(0)) + d(0) * o.sum)) + }) + } + + def fastUpdate(prev: PTable[String, PageRankData], d: Array[Float]) = { + val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats)) + cg(prev, outbound, d) + } + + @Test def testPageRank { + var prev = initialInput(tempDir.copyResourceFileName("urls.txt")) + var delta = 1.0f + while (delta > 0.01f) { + prev = update(prev, Array(0.5f)) + delta = prev.values.map(_.delta).max.value() + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } + + @Test def testFastPageRank { + var prev = initialInput(tempDir.copyResourceFileName("urls.txt")) + var delta = 1.0f + while (delta > 0.01f) { + prev = fastUpdate(prev, Array(0.5f)) + delta = prev.values.map(_.delta).max.value() + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala new file mode 100644 index 0000000..7f9b7f9 --- /dev/null +++ b/crunch-spark/src/it/scala/org/apache/crunch/scrunch/spark/PageRankTest.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch.spark + +import org.apache.crunch.scrunch.{PTable, Avros} +import org.apache.crunch.io.{From => from} + +import _root_.org.junit.Assert._ +import _root_.org.junit.Test + +class PageRankTest extends CrunchSparkSuite { + lazy val pipeline = SparkPipeline[PageRankTest](tempDir.getDefaultConfiguration) + + def initialInput(fileName: String) = { + pipeline.read(from.textFile(fileName)) + .withPType(Avros.strings) + .mapWithContext((line, ctxt) => { + ctxt.getConfiguration; val urls = line.split("\\t"); (urls(0), urls(1)) + }) + .groupByKey + .map((url, links) => (url, (1.0, 0.0, links.toList))) + } + + def update(prev: PTable[String, (Double, Double, List[String])], d: Double) = { + val outbound = prev.flatMap((url, v) => { + val (pr, oldpr, links) = v + links.map(link => (link, pr / links.size)) + }) + cg(prev, outbound, d) + } + + def cg(prev: PTable[String, (Double, Double, List[String])], + out: PTable[String, Double], d: Double) = { + prev.cogroup(out).map((url, v) => { + val (p, o) = v + val (pr, oldpr, links) = p.head + (url, ((1 - d) + d * o.sum, pr, links)) + }) + } + + @Test def testPageRank { + var prev = initialInput(tempDir.copyResourceFileName("urls.txt")) + var delta = 1.0 + while (delta > 0.01) { + prev = update(prev, 0.5) + delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.value() + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala new file mode 100644 index 0000000..cc11b46 --- /dev/null +++ b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ByteBufferInputStream.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.scrunch.spark + +import java.io.InputStream +import java.nio.ByteBuffer + +/** + * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose() + * at the end of the stream (e.g. to close a memory-mapped file). + */ +private[spark] +class ByteBufferInputStream(private var buffer: ByteBuffer) + extends InputStream { + + override def read(): Int = { + if (buffer == null || buffer.remaining() == 0) { + cleanUp() + -1 + } else { + buffer.get() & 0xFF + } + } + + override def read(dest: Array[Byte]): Int = { + read(dest, 0, dest.length) + } + + override def read(dest: Array[Byte], offset: Int, length: Int): Int = { + if (buffer == null || buffer.remaining() == 0) { + cleanUp() + -1 + } else { + val amountToGet = math.min(buffer.remaining(), length) + buffer.get(dest, offset, amountToGet) + amountToGet + } + } + + override def skip(bytes: Long): Long = { + if (buffer != null) { + val amountToSkip = math.min(bytes, buffer.remaining).toInt + buffer.position(buffer.position + amountToSkip) + if (buffer.remaining() == 0) { + cleanUp() + } + amountToSkip + } else { + 0L + } + } + + /** + * Clean up the buffer, and potentially dispose of it using BlockManager.dispose(). + */ + private def cleanUp() { + if (buffer != null) { + buffer = null + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala new file mode 100644 index 0000000..b3934ed --- /dev/null +++ b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/ScrunchSerializer.scala @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch.spark + +import java.io._ +import java.nio.ByteBuffer +import java.util + +import org.apache.spark.serializer.{Serializer, DeserializationStream, SerializationStream, SerializerInstance} + +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf + +private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int) + extends SerializationStream { + private val objOut = new ObjectOutputStream(out) + private var counter = 0 + + /** + * Calling reset to avoid memory leak: + * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api + * But only call it every 100th time to avoid bloated serialization streams (when + * the stream 'resets' object class descriptions have to be re-written) + */ + def writeObject[T: ClassTag](t: T): SerializationStream = { + objOut.writeObject(t) + counter += 1 + if (counterReset > 0 && counter >= counterReset) { + objOut.reset() + counter = 0 + } + this + } + + def flush() { objOut.flush() } + def close() { objOut.close() } +} + +private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader, + primitiveMapping: util.HashMap[String, Class[_]]) + extends DeserializationStream { + private val objIn = new ObjectInputStream(in) { + override def resolveClass(desc: ObjectStreamClass): Class[_] = { + val name = desc.getName + if (primitiveMapping.containsKey(name)) { + return primitiveMapping.get(name) + } + Class.forName(desc.getName, false, loader) + } + } + + def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T] + def close() { objIn.close() } +} + + +private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader, + primitiveMappings: util.HashMap[String, Class[_]]) + extends SerializerInstance { + + override def serialize[T: ClassTag](t: T): ByteBuffer = { + val bos = new ByteArrayOutputStream() + val out = serializeStream(bos) + out.writeObject(t) + out.close() + ByteBuffer.wrap(bos.toByteArray) + } + + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { + val bis = new ByteBufferInputStream(bytes) + val in = deserializeStream(bis) + in.readObject() + } + + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { + val bis = new ByteBufferInputStream(bytes) + val in = deserializeStream(bis, loader) + in.readObject() + } + + override def serializeStream(s: OutputStream): SerializationStream = { + new JavaSerializationStream(s, counterReset) + } + + override def deserializeStream(s: InputStream): DeserializationStream = { + new JavaDeserializationStream(s, defaultClassLoader, primitiveMappings) + } + + def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = { + new JavaDeserializationStream(s, loader, primitiveMappings) + } +} + +class ScrunchSerializer(conf: SparkConf) extends Serializer with Externalizable { + private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100) + + override def newInstance(): SerializerInstance = { + val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader) + val classes = Array[Class[_]](classOf[Byte], classOf[Char], classOf[Short], classOf[Int], + classOf[Long], classOf[Float], classOf[Double], classOf[Boolean]) + val mapping = new util.HashMap[String, Class[_]]() + classes.foreach(cls => {mapping.put(cls.getCanonicalName, cls)}) + new JavaSerializerInstance(counterReset, classLoader, mapping) + } + + override def writeExternal(out: ObjectOutput): Unit = { + out.writeInt(counterReset) + } + + override def readExternal(in: ObjectInput): Unit = { + counterReset = in.readInt() + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala new file mode 100644 index 0000000..d935699 --- /dev/null +++ b/crunch-spark/src/main/scala/org/apache/crunch/scrunch/spark/SparkPipeline.scala @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch.spark + +import org.apache.crunch.scrunch.Pipeline +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkConf + +import scala.reflect.ClassTag + +/** + * A Scrunch {@code Pipeline} instance that wraps an underlying + * {@link org.apache.crunch.impl.spark.SparkPipeline} for executing + * pipelines on Spark. + */ +class SparkPipeline(val master: String, val name: String, val clazz: Class[_], + val conf: Configuration) extends Pipeline({ + new org.apache.crunch.impl.spark.SparkPipeline(master, name, clazz, conf) +}) + +/** + * Companion object for creating {@code SparkPipeline} instances. + */ +object SparkPipeline { + + /** + * Default factory method for creating SparkPipeline instances. + * + * @tparam T The class to use for finding the right client JAR + * @return A new SparkPipeline instance + */ + def apply[T: ClassTag](): SparkPipeline = apply[T](new Configuration()) + + /** + * Factory method that gets the name of the app and the Spark master + * from the {@code spark.app.name} and {@code spark.master} properties. + * + * @param conf The Configuration instance to use + * @tparam T The class to use for finding the right client JAR + * @return A new SparkPipeline instance + */ + def apply[T: ClassTag](conf: Configuration): SparkPipeline = { + val sconf = new SparkConf() + val name = conf.get("spark.app.name", sconf.get("spark.app.name", "ScrunchApp")) + apply[T](name, conf) + } + + /** + * Factory method for SparkPipeline that gets the Spark master from the + * {@code spark.master} property. + * + * @param name The name of the pipeline instance + * @param conf A Configuration instance + * @tparam T The class to use for finding the right client JAR + * @return A new SparkPipeline + */ + def apply[T: ClassTag](name: String, conf: Configuration): SparkPipeline = { + val sconf = new SparkConf() + val master = conf.get("spark.master", sconf.get("spark.master", "local")) + apply[T](master, name, conf) + } + + /** + * Factory method for SparkPipeline. + * + * @param master The URL or code for the Spark master to use. + * @param name The name of the pipeline instance + * @param conf A Configuration instance + * @tparam T The class to use for finding the right client JAR + * @return A new SparkPipeline + */ + def apply[T: ClassTag](master: String, name: String, conf: Configuration): SparkPipeline = { + conf.set("spark.closure.serializer", "org.apache.crunch.scrunch.spark.ScrunchSerializer") + new SparkPipeline(master, name, implicitly[ClassTag[T]].runtimeClass, conf) + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/dbd56e63/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 519d65b..44fb488 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ under the License. 2.10 2.10.4 1.9.1 - 1.0.0 + 1.2.0 1.3.9