spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [26/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 21:59:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
deleted file mode 100644
index bb8aad3..0000000
--- a/core/src/main/scala/spark/Utils.scala
+++ /dev/null
@@ -1,780 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
-import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.regex.Pattern
-
-import scala.collection.Map
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-import scala.io.Source
-
-import com.google.common.io.Files
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
-
-import spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-import spark.deploy.SparkHadoopUtil
-import java.nio.ByteBuffer
-
-
-/**
- * Various utility methods used by Spark.
- */
-private object Utils extends Logging {
-
-  /** Serialize an object using Java serialization */
-  def serialize[T](o: T): Array[Byte] = {
-    val bos = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bos)
-    oos.writeObject(o)
-    oos.close()
-    return bos.toByteArray
-  }
-
-  /** Deserialize an object using Java serialization */
-  def deserialize[T](bytes: Array[Byte]): T = {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis)
-    return ois.readObject.asInstanceOf[T]
-  }
-
-  /** Deserialize an object using Java serialization and the given ClassLoader */
-  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis) {
-      override def resolveClass(desc: ObjectStreamClass) =
-        Class.forName(desc.getName, false, loader)
-    }
-    return ois.readObject.asInstanceOf[T]
-  }
-
-  /** Serialize via nested stream using specific serializer */
-  def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
-    val osWrapper = ser.serializeStream(new OutputStream {
-      def write(b: Int) = os.write(b)
-
-      override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
-    })
-    try {
-      f(osWrapper)
-    } finally {
-      osWrapper.close()
-    }
-  }
-
-  /** Deserialize via nested stream using specific serializer */
-  def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
-    val isWrapper = ser.deserializeStream(new InputStream {
-      def read(): Int = is.read()
-
-      override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
-    })
-    try {
-      f(isWrapper)
-    } finally {
-      isWrapper.close()
-    }
-  }
-
-  /**
-   * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
-   */
-  def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
-    if (bb.hasArray) {
-      out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
-    } else {
-      val bbval = new Array[Byte](bb.remaining())
-      bb.get(bbval)
-      out.write(bbval)
-    }
-  }
-
-  def isAlpha(c: Char): Boolean = {
-    (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
-  }
-
-  /** Split a string into words at non-alphabetic characters */
-  def splitWords(s: String): Seq[String] = {
-    val buf = new ArrayBuffer[String]
-    var i = 0
-    while (i < s.length) {
-      var j = i
-      while (j < s.length && isAlpha(s.charAt(j))) {
-        j += 1
-      }
-      if (j > i) {
-        buf += s.substring(i, j)
-      }
-      i = j
-      while (i < s.length && !isAlpha(s.charAt(i))) {
-        i += 1
-      }
-    }
-    return buf
-  }
-
-  private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
-
-  // Register the path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(file: File) {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths += absolutePath
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.contains(absolutePath)
-    }
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to delete each others
-  // paths - resulting in IOException and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.find { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }.isDefined
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
-  /** Create a temporary directory inside the given parent directory */
-  def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
-    var attempts = 0
-    val maxAttempts = 10
-    var dir: File = null
-    while (dir == null) {
-      attempts += 1
-      if (attempts > maxAttempts) {
-        throw new IOException("Failed to create a temp directory (under " + root + ") after " +
-          maxAttempts + " attempts!")
-      }
-      try {
-        dir = new File(root, "spark-" + UUID.randomUUID.toString)
-        if (dir.exists() || !dir.mkdirs()) {
-          dir = null
-        }
-      } catch { case e: IOException => ; }
-    }
-
-    registerShutdownDeleteDir(dir)
-
-    // Add a shutdown hook to delete the temp dir when the JVM exits
-    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
-      override def run() {
-        // Attempt to delete if some patch which is parent of this is not already registered.
-        if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
-      }
-    })
-    dir
-  }
-
-  /** Copy all data from an InputStream to an OutputStream */
-  def copyStream(in: InputStream,
-                 out: OutputStream,
-                 closeStreams: Boolean = false)
-  {
-    val buf = new Array[Byte](8192)
-    var n = 0
-    while (n != -1) {
-      n = in.read(buf)
-      if (n != -1) {
-        out.write(buf, 0, n)
-      }
-    }
-    if (closeStreams) {
-      in.close()
-      out.close()
-    }
-  }
-
-  /**
-   * Download a file requested by the executor. Supports fetching the file in a variety of ways,
-   * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
-   *
-   * Throws SparkException if the target file already exists and has different contents than
-   * the requested file.
-   */
-  def fetchFile(url: String, targetDir: File) {
-    val filename = url.split("/").last
-    val tempDir = getLocalDir
-    val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
-    val targetFile = new File(targetDir, filename)
-    val uri = new URI(url)
-    uri.getScheme match {
-      case "http" | "https" | "ftp" =>
-        logInfo("Fetching " + url + " to " + tempFile)
-        val in = new URL(url).openStream()
-        val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, true)
-        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          tempFile.delete()
-          throw new SparkException(
-            "File " + targetFile + " exists and does not match contents of" + " " + url)
-        } else {
-          Files.move(tempFile, targetFile)
-        }
-      case "file" | null =>
-        // In the case of a local file, copy the local file to the target directory.
-        // Note the difference between uri vs url.
-        val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
-        if (targetFile.exists) {
-          // If the target file already exists, warn the user if
-          if (!Files.equal(sourceFile, targetFile)) {
-            throw new SparkException(
-              "File " + targetFile + " exists and does not match contents of" + " " + url)
-          } else {
-            // Do nothing if the file contents are the same, i.e. this file has been copied
-            // previously.
-            logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
-              + targetFile.getAbsolutePath)
-          }
-        } else {
-          // The file does not exist in the target directory. Copy it there.
-          logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
-          Files.copy(sourceFile, targetFile)
-        }
-      case _ =>
-        // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
-        val env = SparkEnv.get
-        val uri = new URI(url)
-        val conf = env.hadoop.newConfiguration()
-        val fs = FileSystem.get(uri, conf)
-        val in = fs.open(new Path(uri))
-        val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, true)
-        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          tempFile.delete()
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
-        } else {
-          Files.move(tempFile, targetFile)
-        }
-    }
-    // Decompress the file if it's a .tar or .tar.gz
-    if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
-      logInfo("Untarring " + filename)
-      Utils.execute(Seq("tar", "-xzf", filename), targetDir)
-    } else if (filename.endsWith(".tar")) {
-      logInfo("Untarring " + filename)
-      Utils.execute(Seq("tar", "-xf", filename), targetDir)
-    }
-    // Make the file executable - That's necessary for scripts
-    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
-  }
-
-  /**
-   * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
-   * return a single directory, even though the spark.local.dir property might be a list of
-   * multiple paths.
-   */
-  def getLocalDir: String = {
-    System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
-  }
-
-  /**
-   * Shuffle the elements of a collection into a random order, returning the
-   * result in a new collection. Unlike scala.util.Random.shuffle, this method
-   * uses a local random number generator, avoiding inter-thread contention.
-   */
-  def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
-    randomizeInPlace(seq.toArray)
-  }
-
-  /**
-   * Shuffle the elements of an array into a random order, modifying the
-   * original array. Returns the original array.
-   */
-  def randomizeInPlace[T](arr: Array[T], rand: Random = new Random): Array[T] = {
-    for (i <- (arr.length - 1) to 1 by -1) {
-      val j = rand.nextInt(i)
-      val tmp = arr(j)
-      arr(j) = arr(i)
-      arr(i) = tmp
-    }
-    arr
-  }
-
-  /**
-   * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
-   * Note, this is typically not used from within core spark.
-   */
-  lazy val localIpAddress: String = findLocalIpAddress()
-  lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
-
-  private def findLocalIpAddress(): String = {
-    val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
-    if (defaultIpOverride != null) {
-      defaultIpOverride
-    } else {
-      val address = InetAddress.getLocalHost
-      if (address.isLoopbackAddress) {
-        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
-        // a better address using the local network interfaces
-        for (ni <- NetworkInterface.getNetworkInterfaces) {
-          for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
-               !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
-            // We've found an address that looks reasonable!
-            logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
-              " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
-              " instead (on interface " + ni.getName + ")")
-            logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
-            return addr.getHostAddress
-          }
-        }
-        logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
-          " a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
-          " external IP address!")
-        logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
-      }
-      address.getHostAddress
-    }
-  }
-
-  private var customHostname: Option[String] = None
-
-  /**
-   * Allow setting a custom host name because when we run on Mesos we need to use the same
-   * hostname it reports to the master.
-   */
-  def setCustomHostname(hostname: String) {
-    // DEBUG code
-    Utils.checkHost(hostname)
-    customHostname = Some(hostname)
-  }
-
-  /**
-   * Get the local machine's hostname.
-   */
-  def localHostName(): String = {
-    customHostname.getOrElse(localIpAddressHostname)
-  }
-
-  def getAddressHostName(address: String): String = {
-    InetAddress.getByName(address).getHostName
-  }
-
-  def localHostPort(): String = {
-    val retval = System.getProperty("spark.hostPort", null)
-    if (retval == null) {
-      logErrorWithStack("spark.hostPort not set but invoking localHostPort")
-      return localHostName()
-    }
-
-    retval
-  }
-
-  def checkHost(host: String, message: String = "") {
-    assert(host.indexOf(':') == -1, message)
-  }
-
-  def checkHostPort(hostPort: String, message: String = "") {
-    assert(hostPort.indexOf(':') != -1, message)
-  }
-
-  // Used by DEBUG code : remove when all testing done
-  def logErrorWithStack(msg: String) {
-    try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
-  }
-
-  // Typically, this will be of order of number of nodes in cluster
-  // If not, we should change it to LRUCache or something.
-  private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
-
-  def parseHostPort(hostPort: String): (String,  Int) = {
-    {
-      // Check cache first.
-      var cached = hostPortParseResults.get(hostPort)
-      if (cached != null) return cached
-    }
-
-    val indx: Int = hostPort.lastIndexOf(':')
-    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
-    // but then hadoop does not support ipv6 right now.
-    // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
-    if (-1 == indx) {
-      val retval = (hostPort, 0)
-      hostPortParseResults.put(hostPort, retval)
-      return retval
-    }
-
-    val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
-    hostPortParseResults.putIfAbsent(hostPort, retval)
-    hostPortParseResults.get(hostPort)
-  }
-
-  private[spark] val daemonThreadFactory: ThreadFactory =
-    new ThreadFactoryBuilder().setDaemon(true).build()
-
-  /**
-   * Wrapper over newCachedThreadPool.
-   */
-  def newDaemonCachedThreadPool(): ThreadPoolExecutor =
-    Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
-
-  /**
-   * Return the string to tell how long has passed in seconds. The passing parameter should be in
-   * millisecond.
-   */
-  def getUsedTimeMs(startTimeMs: Long): String = {
-    return " " + (System.currentTimeMillis - startTimeMs) + " ms"
-  }
-
-  /**
-   * Wrapper over newFixedThreadPool.
-   */
-  def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
-    Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
-
-  /**
-   * Delete a file or directory and its contents recursively.
-   */
-  def deleteRecursively(file: File) {
-    if (file.isDirectory) {
-      for (child <- file.listFiles()) {
-        deleteRecursively(child)
-      }
-    }
-    if (!file.delete()) {
-      throw new IOException("Failed to delete: " + file)
-    }
-  }
-
-  /**
-   * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
-   * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
-   * environment variable.
-   */
-  def memoryStringToMb(str: String): Int = {
-    val lower = str.toLowerCase
-    if (lower.endsWith("k")) {
-      (lower.substring(0, lower.length-1).toLong / 1024).toInt
-    } else if (lower.endsWith("m")) {
-      lower.substring(0, lower.length-1).toInt
-    } else if (lower.endsWith("g")) {
-      lower.substring(0, lower.length-1).toInt * 1024
-    } else if (lower.endsWith("t")) {
-      lower.substring(0, lower.length-1).toInt * 1024 * 1024
-    } else {// no suffix, so it's just a number in bytes
-      (lower.toLong / 1024 / 1024).toInt
-    }
-  }
-
-  /**
-   * Convert a quantity in bytes to a human-readable string such as "4.0 MB".
-   */
-  def bytesToString(size: Long): String = {
-    val TB = 1L << 40
-    val GB = 1L << 30
-    val MB = 1L << 20
-    val KB = 1L << 10
-
-    val (value, unit) = {
-      if (size >= 2*TB) {
-        (size.asInstanceOf[Double] / TB, "TB")
-      } else if (size >= 2*GB) {
-        (size.asInstanceOf[Double] / GB, "GB")
-      } else if (size >= 2*MB) {
-        (size.asInstanceOf[Double] / MB, "MB")
-      } else if (size >= 2*KB) {
-        (size.asInstanceOf[Double] / KB, "KB")
-      } else {
-        (size.asInstanceOf[Double], "B")
-      }
-    }
-    "%.1f %s".formatLocal(Locale.US, value, unit)
-  }
-
-  /**
-   * Returns a human-readable string representing a duration such as "35ms"
-   */
-  def msDurationToString(ms: Long): String = {
-    val second = 1000
-    val minute = 60 * second
-    val hour = 60 * minute
-
-    ms match {
-      case t if t < second =>
-        "%d ms".format(t)
-      case t if t < minute =>
-        "%.1f s".format(t.toFloat / second)
-      case t if t < hour =>
-        "%.1f m".format(t.toFloat / minute)
-      case t =>
-        "%.2f h".format(t.toFloat / hour)
-    }
-  }
-
-  /**
-   * Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
-   */
-  def megabytesToString(megabytes: Long): String = {
-    bytesToString(megabytes * 1024L * 1024L)
-  }
-
-  /**
-   * Execute a command in the given working directory, throwing an exception if it completes
-   * with an exit code other than 0.
-   */
-  def execute(command: Seq[String], workingDir: File) {
-    val process = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-        .redirectErrorStream(true)
-        .start()
-    new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-    val exitCode = process.waitFor()
-    if (exitCode != 0) {
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
-    }
-  }
-
-  /**
-   * Execute a command in the current working directory, throwing an exception if it completes
-   * with an exit code other than 0.
-   */
-  def execute(command: Seq[String]) {
-    execute(command, new File("."))
-  }
-
-  /**
-   * Execute a command and get its output, throwing an exception if it yields a code other than 0.
-   */
-  def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
-                          extraEnvironment: Map[String, String] = Map.empty): String = {
-    val builder = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-    val environment = builder.environment()
-    for ((key, value) <- extraEnvironment) {
-      environment.put(key, value)
-    }
-    val process = builder.start()
-    new Thread("read stderr for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-    val output = new StringBuffer
-    val stdoutThread = new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
-          output.append(line)
-        }
-      }
-    }
-    stdoutThread.start()
-    val exitCode = process.waitFor()
-    stdoutThread.join()   // Wait for it to finish reading output
-    if (exitCode != 0) {
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
-    }
-    output.toString
-  }
-
-  /**
-   * A regular expression to match classes of the "core" Spark API that we want to skip when
-   * finding the call site of a method.
-   */
-  private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
-
-  private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
-                                    val firstUserLine: Int, val firstUserClass: String)
-
-  /**
-   * When called inside a class in the spark package, returns the name of the user code class
-   * (outside the spark package) that called into Spark, as well as which Spark method they called.
-   * This is used, for example, to tell users where in their code each RDD got created.
-   */
-  def getCallSiteInfo: CallSiteInfo = {
-    val trace = Thread.currentThread.getStackTrace().filter( el =>
-      (!el.getMethodName.contains("getStackTrace")))
-
-    // Keep crawling up the stack trace until we find the first function not inside of the spark
-    // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
-    // transformation, a SparkContext function (such as parallelize), or anything else that leads
-    // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
-    var lastSparkMethod = "<unknown>"
-    var firstUserFile = "<unknown>"
-    var firstUserLine = 0
-    var finished = false
-    var firstUserClass = "<unknown>"
-
-    for (el <- trace) {
-      if (!finished) {
-        if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
-          lastSparkMethod = if (el.getMethodName == "<init>") {
-            // Spark method is a constructor; get its class name
-            el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
-          } else {
-            el.getMethodName
-          }
-        }
-        else {
-          firstUserLine = el.getLineNumber
-          firstUserFile = el.getFileName
-          firstUserClass = el.getClassName
-          finished = true
-        }
-      }
-    }
-    new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
-  }
-
-  def formatSparkCallSite = {
-    val callSiteInfo = getCallSiteInfo
-    "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
-                         callSiteInfo.firstUserLine)
-  }
-
-  /** Return a string containing part of a file from byte 'start' to 'end'. */
-  def offsetBytes(path: String, start: Long, end: Long): String = {
-    val file = new File(path)
-    val length = file.length()
-    val effectiveEnd = math.min(length, end)
-    val effectiveStart = math.max(0, start)
-    val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
-    val stream = new FileInputStream(file)
-
-    stream.skip(effectiveStart)
-    stream.read(buff)
-    stream.close()
-    Source.fromBytes(buff).mkString
-  }
-
-  /**
-   * Clone an object using a Spark serializer.
-   */
-  def clone[T](value: T, serializer: SerializerInstance): T = {
-    serializer.deserialize[T](serializer.serialize(value))
-  }
-
-  /**
-   * Detect whether this thread might be executing a shutdown hook. Will always return true if
-   * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
-   * if System.exit was just called by a concurrent thread).
-   *
-   * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
-   * an IllegalStateException.
-   */
-  def inShutdown(): Boolean = {
-    try {
-      val hook = new Thread {
-        override def run() {}
-      }
-      Runtime.getRuntime.addShutdownHook(hook)
-      Runtime.getRuntime.removeShutdownHook(hook)
-    } catch {
-      case ise: IllegalStateException => return true
-    }
-    return false
-  }
-
-  def isSpace(c: Char): Boolean = {
-    " \t\r\n".indexOf(c) != -1
-  }
-
-  /**
-   * Split a string of potentially quoted arguments from the command line the way that a shell
-   * would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
-   * then it would be parsed as three arguments: 'a', 'b c' and 'd'.
-   */
-  def splitCommandString(s: String): Seq[String] = {
-    val buf = new ArrayBuffer[String]
-    var inWord = false
-    var inSingleQuote = false
-    var inDoubleQuote = false
-    var curWord = new StringBuilder
-    def endWord() {
-      buf += curWord.toString
-      curWord.clear()
-    }
-    var i = 0
-    while (i < s.length) {
-      var nextChar = s.charAt(i)
-      if (inDoubleQuote) {
-        if (nextChar == '"') {
-          inDoubleQuote = false
-        } else if (nextChar == '\\') {
-          if (i < s.length - 1) {
-            // Append the next character directly, because only " and \ may be escaped in
-            // double quotes after the shell's own expansion
-            curWord.append(s.charAt(i + 1))
-            i += 1
-          }
-        } else {
-          curWord.append(nextChar)
-        }
-      } else if (inSingleQuote) {
-        if (nextChar == '\'') {
-          inSingleQuote = false
-        } else {
-          curWord.append(nextChar)
-        }
-        // Backslashes are not treated specially in single quotes
-      } else if (nextChar == '"') {
-        inWord = true
-        inDoubleQuote = true
-      } else if (nextChar == '\'') {
-        inWord = true
-        inSingleQuote = true
-      } else if (!isSpace(nextChar)) {
-        curWord.append(nextChar)
-        inWord = true
-      } else if (inWord && isSpace(nextChar)) {
-        endWord()
-        inWord = false
-      }
-      i += 1
-    }
-    if (inWord || inDoubleQuote || inSingleQuote) {
-      endWord()
-    }
-    return buf
-  }
-
- /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
-  * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
-  * so function return (x % mod) + mod in that case.
-  */
-  def nonNegativeMod(x: Int, mod: Int): Int = {
-    val rawMod = x % mod
-    rawMod + (if (rawMod < 0) mod else 0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
deleted file mode 100644
index 8ce7df6..0000000
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import spark.RDD
-import spark.SparkContext.doubleRDDToDoubleRDDFunctions
-import spark.api.java.function.{Function => JFunction}
-import spark.util.StatCounter
-import spark.partial.{BoundedDouble, PartialResult}
-import spark.storage.StorageLevel
-import java.lang.Double
-import spark.Partitioner
-
-class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
-
-  override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
-
-  override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
-
-  override def wrapRDD(rdd: RDD[Double]): JavaDoubleRDD =
-    new JavaDoubleRDD(rdd.map(_.doubleValue))
-
-  // Common RDD functions
-
-  import JavaDoubleRDD.fromRDD
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
-
-  /** 
-   * Set this RDD's storage level to persist its values across operations after the first time
-   * it is computed. Can only be called once on each RDD.
-   */
-  def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))
-
-  // first() has to be overriden here in order for its return type to be Double instead of Object.
-  override def first(): Double = srdd.first()
-
-  // Transformations (return a new RDD)
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numPartitions))
-
-  /**
-   * Return a new RDD containing only the elements that satisfy a predicate.
-   */
-  def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
-    fromRDD(srdd.filter(x => f(x).booleanValue()))
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
-    fromRDD(srdd.coalesce(numPartitions, shuffle))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   * 
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
-    fromRDD(srdd.subtract(other))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
-    fromRDD(srdd.subtract(other, numPartitions))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
-    fromRDD(srdd.subtract(other, p))
-
-  /**
-   * Return a sampled subset of this RDD.
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
-    fromRDD(srdd.sample(withReplacement, fraction, seed))
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def union(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.union(other.srdd))
-
-  // Double RDD functions
-
-  /** Add up the elements in this RDD. */
-  def sum(): Double = srdd.sum()
-
-  /**
-   * Return a [[spark.util.StatCounter]] object that captures the mean, variance and count
-   * of the RDD's elements in one operation.
-   */
-  def stats(): StatCounter = srdd.stats()
-
-  /** Compute the mean of this RDD's elements. */
-  def mean(): Double = srdd.mean()
-
-  /** Compute the variance of this RDD's elements. */
-  def variance(): Double = srdd.variance()
-
-  /** Compute the standard deviation of this RDD's elements. */
-  def stdev(): Double = srdd.stdev()
-
-  /**
-   * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
-   * estimating the standard deviation by dividing by N-1 instead of N).
-   */
-  def sampleStdev(): Double = srdd.sampleStdev()
-
-  /**
-   * Compute the sample variance of this RDD's elements (which corrects for bias in
-   * estimating the standard variance by dividing by N-1 instead of N).
-   */
-  def sampleVariance(): Double = srdd.sampleVariance()
-
-  /** Return the approximate mean of the elements in this RDD. */
-  def meanApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
-    srdd.meanApprox(timeout, confidence)
-
-  /** (Experimental) Approximate operation to return the mean within a timeout. */
-  def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)
-
-  /** (Experimental) Approximate operation to return the sum within a timeout. */
-  def sumApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
-    srdd.sumApprox(timeout, confidence)
-
-  /** (Experimental) Approximate operation to return the sum within a timeout. */
-  def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
-}
-
-object JavaDoubleRDD {
-  def fromRDD(rdd: RDD[scala.Double]): JavaDoubleRDD = new JavaDoubleRDD(rdd)
-
-  implicit def toRDD(rdd: JavaDoubleRDD): RDD[scala.Double] = rdd.srdd
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
deleted file mode 100644
index effe6e5..0000000
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import java.util.{List => JList}
-import java.util.Comparator
-
-import scala.Tuple2
-import scala.collection.JavaConversions._
-
-import com.google.common.base.Optional
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-
-import spark.HashPartitioner
-import spark.Partitioner
-import spark.Partitioner._
-import spark.RDD
-import spark.SparkContext.rddToPairRDDFunctions
-import spark.api.java.function.{Function2 => JFunction2}
-import spark.api.java.function.{Function => JFunction}
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.rdd.OrderedRDDFunctions
-import spark.storage.StorageLevel
-
-
-class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
-  implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
-
-  override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
-
-  override val classManifest: ClassManifest[(K, V)] =
-    implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
-
-  import JavaPairRDD._
-
-  // Common RDD functions
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
-
-  /** 
-   * Set this RDD's storage level to persist its values across operations after the first time
-   * it is computed. Can only be called once on each RDD.
-   */
-  def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.persist(newLevel))
-
-  // Transformations (return a new RDD)
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
-
-  /**
-   * Return a new RDD containing only the elements that satisfy a predicate.
-   */
-  def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
-    fromRDD(rdd.coalesce(numPartitions, shuffle))
-
-  /**
-   * Return a sampled subset of this RDD.
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.union(other.rdd))
-
-  // first() has to be overridden here so that the generated method has the signature
-  // 'public scala.Tuple2 first()'; if the trait's definition is used,
-  // then the method has the signature 'public java.lang.Object first()',
-  // causing NoSuchMethodErrors at runtime.
-  override def first(): (K, V) = rdd.first()
-
-  // Pair RDD functions
- 
-  /**
-   * Generic function to combine the elements for each key using a custom set of aggregation 
-   * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a 
-   * "combined type" C * Note that V and C can be different -- for example, one might group an 
-   * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three 
-   * functions:
-   * 
-   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
-   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
-   * - `mergeCombiners`, to combine two C's into a single one.
-   *
-   * In addition, users can control the partitioning of the output RDD, and whether to perform
-   * map-side aggregation (if a mapper can produce multiple items with the same key).
-   */
-  def combineByKey[C](createCombiner: JFunction[V, C],
-    mergeValue: JFunction2[C, V, C],
-    mergeCombiners: JFunction2[C, C, C],
-    partitioner: Partitioner): JavaPairRDD[K, C] = {
-    implicit val cm: ClassManifest[C] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
-    fromRDD(rdd.combineByKey(
-      createCombiner,
-      mergeValue,
-      mergeCombiners,
-      partitioner
-    ))
-  }
-
-  /**
-   * Simplified version of combineByKey that hash-partitions the output RDD.
-   */
-  def combineByKey[C](createCombiner: JFunction[V, C],
-    mergeValue: JFunction2[C, V, C],
-    mergeCombiners: JFunction2[C, C, C],
-    numPartitions: Int): JavaPairRDD[K, C] =
-    combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
-   */
-  def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
-    fromRDD(rdd.reduceByKey(partitioner, func))
-
-  /**
-   * Merge the values for each key using an associative reduce function, but return the results
-   * immediately to the master as a Map. This will also perform the merging locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
-   */
-  def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
-    mapAsJavaMap(rdd.reduceByKeyLocally(func))
-
-  /** Count the number of elements for each key, and return the result to the master as a Map. */
-  def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
-
-  /** 
-   * (Experimental) Approximate version of countByKey that can return a partial result if it does
-   * not finish within a timeout.
-   */
-  def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
-    rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
-
-  /** 
-   * (Experimental) Approximate version of countByKey that can return a partial result if it does
-   * not finish within a timeout.
-   */
-  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
-  : PartialResult[java.util.Map[K, BoundedDouble]] =
-    rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
-    fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
-    fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
-    fromRDD(rdd.foldByKey(zeroValue)(func))
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
-   */
-  def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
-    fromRDD(rdd.reduceByKey(func, numPartitions))
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Allows controlling the
-   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
-   */
-  def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
-    fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
-   * resulting RDD with into `numPartitions` partitions.
-   */
-  def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
-    fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   * 
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
-    fromRDD(rdd.subtract(other))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
-    fromRDD(rdd.subtract(other, numPartitions))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
-    fromRDD(rdd.subtract(other, p))
-
-  /**
-   * Return a copy of the RDD partitioned using the specified partitioner.
-   */
-  def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
-    fromRDD(rdd.partitionBy(partitioner))
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
-   */
-  def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
-    fromRDD(rdd.join(other, partitioner))
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
-   * partition the output RDD.
-   */
-  def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
-  : JavaPairRDD[K, (V, Optional[W])] = {
-    val joinResult = rdd.leftOuterJoin(other, partitioner)
-    fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
-   * partition the output RDD.
-   */
-  def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
-  : JavaPairRDD[K, (Optional[V], W)] = {
-    val joinResult = rdd.rightOuterJoin(other, partitioner)
-    fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
-  }
-
-  /** 
-   * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
-   * partitioner/parallelism level.
-   */
-  def combineByKey[C](createCombiner: JFunction[V, C],
-    mergeValue: JFunction2[C, V, C],
-    mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
-    implicit val cm: ClassManifest[C] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
-    fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
-   * parallelism level.
-   */
-  def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
-    fromRDD(reduceByKey(defaultPartitioner(rdd), func))
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
-   * resulting RDD with the existing partitioner/parallelism level.
-   */
-  def groupByKey(): JavaPairRDD[K, JList[V]] =
-    fromRDD(groupByResultToJava(rdd.groupByKey()))
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Performs a hash join across the cluster.
-   */
-  def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
-    fromRDD(rdd.join(other))
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Performs a hash join across the cluster.
-   */
-  def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
-    fromRDD(rdd.join(other, numPartitions))
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
-   * using the existing partitioner/parallelism level.
-   */
-  def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
-    val joinResult = rdd.leftOuterJoin(other)
-    fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
-   * into `numPartitions` partitions.
-   */
-  def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
-    val joinResult = rdd.leftOuterJoin(other, numPartitions)
-    fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
-   * RDD using the existing partitioner/parallelism level.
-   */
-  def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
-    val joinResult = rdd.rightOuterJoin(other)
-    fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
-   * RDD into the given number of partitions.
-   */
-  def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
-    val joinResult = rdd.rightOuterJoin(other, numPartitions)
-    fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
-  }
-
-  /**
-   * Return the key-value pairs in this RDD to the master as a Map.
-   */
-  def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())
-
-  /**
-   * Pass each value in the key-value pair RDD through a map function without changing the keys;
-   * this also retains the original RDD's partitioning.
-   */
-  def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
-    implicit val cm: ClassManifest[U] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
-    fromRDD(rdd.mapValues(f))
-  }
-
-  /**
-   * Pass each value in the key-value pair RDD through a flatMap function without changing the
-   * keys; this also retains the original RDD's partitioning.
-   */
-  def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
-    import scala.collection.JavaConverters._
-    def fn = (x: V) => f.apply(x).asScala
-    implicit val cm: ClassManifest[U] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
-    fromRDD(rdd.flatMapValues(fn))
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
-  : JavaPairRDD[K, (JList[V], JList[W])] =
-    fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
-    fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
-    fromRDD(cogroupResultToJava(rdd.cogroup(other)))
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
-    fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
-  = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
-    fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
-
-  /** Alias for cogroup. */
-  def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
-    fromRDD(cogroupResultToJava(rdd.groupWith(other)))
-
-  /** Alias for cogroup. */
-  def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
-    fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
-
-  /**
-   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
-   * RDD has a known partitioner by only searching the partition that the key maps to.
-   */
-  def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
-
-  /** Output the RDD to any Hadoop-supported file system. */
-  def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    conf: JobConf) {
-    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
-  }
-
-  /** Output the RDD to any Hadoop-supported file system. */
-  def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F]) {
-    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
-  }
-
-  /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
-  def saveAsHadoopFile[F <: OutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    codec: Class[_ <: CompressionCodec]) {
-    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
-  }
-
-  /** Output the RDD to any Hadoop-supported file system. */
-  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F],
-    conf: Configuration) {
-    rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
-  }
-
-  /** Output the RDD to any Hadoop-supported file system. */
-  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
-    path: String,
-    keyClass: Class[_],
-    valueClass: Class[_],
-    outputFormatClass: Class[F]) {
-    rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
-   * that storage system. The JobConf should set an OutputFormat and any output paths required
-   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
-   * MapReduce job.
-   */
-  def saveAsHadoopDataset(conf: JobConf) {
-    rdd.saveAsHadoopDataset(conf)
-  }
-
-  /**
-   * Sort the RDD by key, so that each partition contains a sorted range of the elements in
-   * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an
-   * ordered list of records (in the `save` case, they will be written to multiple `part-X` files
-   * in the filesystem, in order of the keys).
-   */
-  def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
-
-  /**
-   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
-   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
-   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
-   * order of the keys).
-   */
-  def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
-    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
-    sortByKey(comp, ascending)
-  }
-
-  /**
-   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
-   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
-   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
-   * order of the keys).
-   */
-  def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true)
-
-  /**
-   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
-   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
-   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
-   * order of the keys).
-   */
-  def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
-    class KeyOrdering(val a: K) extends Ordered[K] {
-      override def compare(b: K) = comp.compare(a, b)
-    }
-    implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
-    fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
-  }
-
-  /**
-   * Return an RDD with the keys of each tuple.
-   */
-  def keys(): JavaRDD[K] = JavaRDD.fromRDD[K](rdd.map(_._1))
-
-  /**
-   * Return an RDD with the values of each tuple.
-   */
-  def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
-}
-
-object JavaPairRDD {
-  def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K],
-    vcm: ClassManifest[T]): RDD[(K, JList[T])] =
-    rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _)
-
-  def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassManifest[K],
-    vcm: ClassManifest[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V],
-    Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
-
-  def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1],
-    Seq[W2]))])(implicit kcm: ClassManifest[K]) : RDD[(K, (JList[V], JList[W1],
-    JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues(
-    (x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1),
-      seqAsJavaList(x._2),
-      seqAsJavaList(x._3)))
-
-  def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd)
-
-  implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
deleted file mode 100644
index c0bf2cf..0000000
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import spark._
-import spark.api.java.function.{Function => JFunction}
-import spark.storage.StorageLevel
-
-class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends
-JavaRDDLike[T, JavaRDD[T]] {
-
-  override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
-
-  // Common RDD functions
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
-
-  /**
-   * Set this RDD's storage level to persist its values across operations after the first time
-   * it is computed. This can only be used to assign a new storage level if the RDD does not
-   * have a storage level set yet..
-   */
-  def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
-
-  /**
-   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
-   */
-  def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist())
-
-  // Transformations (return a new RDD)
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions))
-
-  /**
-   * Return a new RDD containing only the elements that satisfy a predicate.
-   */
-  def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
-    wrapRDD(rdd.filter((x => f(x).booleanValue())))
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
-    rdd.coalesce(numPartitions, shuffle)
-
-  /**
-   * Return a sampled subset of this RDD.
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
-    wrapRDD(rdd.sample(withReplacement, fraction, seed))
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   *
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] =
-    wrapRDD(rdd.subtract(other, numPartitions))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
-    wrapRDD(rdd.subtract(other, p))
-}
-
-object JavaRDD {
-
-  implicit def fromRDD[T: ClassManifest](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
-
-  implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
deleted file mode 100644
index 2c2b138..0000000
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import java.util.{List => JList, Comparator}
-import scala.Tuple2
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.io.compress.CompressionCodec
-import spark.{SparkContext, Partition, RDD, TaskContext}
-import spark.api.java.JavaPairRDD._
-import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
-import spark.partial.{PartialResult, BoundedDouble}
-import spark.storage.StorageLevel
-import com.google.common.base.Optional
-
-
-trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
-  def wrapRDD(rdd: RDD[T]): This
-
-  implicit val classManifest: ClassManifest[T]
-
-  def rdd: RDD[T]
-
-  /** Set of partitions in this RDD. */
-  def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
-
-  /** The [[spark.SparkContext]] that this RDD was created on. */
-  def context: SparkContext = rdd.context
-
-  /** A unique ID for this RDD (within its SparkContext). */
-  def id: Int = rdd.id
-
-  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
-  def getStorageLevel: StorageLevel = rdd.getStorageLevel
-
-  /**
-   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
-   * This should ''not'' be called by users directly, but is available for implementors of custom
-   * subclasses of RDD.
-   */
-  def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
-    asJavaIterator(rdd.iterator(split, taskContext))
-
-  // Transformations (return a new RDD)
-
-  /**
-   * Return a new RDD by applying a function to all elements of this RDD.
-   */
-  def map[R](f: JFunction[T, R]): JavaRDD[R] =
-    new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
-
-  /**
-   * Return a new RDD by applying a function to all elements of this RDD.
-   */
-  def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
-    new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
-
-  /**
-   * Return a new RDD by applying a function to all elements of this RDD.
-   */
-  def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
-    new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
-  }
-
-  /**
-   *  Return a new RDD by first applying a function to all elements of this
-   *  RDD, and then flattening the results.
-   */
-  def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
-    import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
-  }
-
-  /**
-   *  Return a new RDD by first applying a function to all elements of this
-   *  RDD, and then flattening the results.
-   */
-  def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
-    import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
-  }
-
-  /**
-   *  Return a new RDD by first applying a function to all elements of this
-   *  RDD, and then flattening the results.
-   */
-  def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
-    import scala.collection.JavaConverters._
-    def fn = (x: T) => f.apply(x).asScala
-    def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
-    JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
-  }
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD.
-   */
-  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
-  }
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD.
-   */
-  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
-  }
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD.
-   */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
-  JavaPairRDD[K2, V2] = {
-    def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
-  }
-
-  /**
-   * Return an RDD created by coalescing all elements within each partition into an array.
-   */
-  def glom(): JavaRDD[JList[T]] =
-    new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
-
-  /**
-   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
-   * elements (a, b) where a is in `this` and b is in `other`.
-   */
-  def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] =
-    JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
-      other.classManifest)
-
-  /**
-   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
-   * mapping to that key.
-   */
-  def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
-    implicit val kcm: ClassManifest[K] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
-    implicit val vcm: ClassManifest[JList[T]] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
-  }
-
-  /**
-   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
-   * mapping to that key.
-   */
-  def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
-    implicit val kcm: ClassManifest[K] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
-    implicit val vcm: ClassManifest[JList[T]] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
-    JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
-  }
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: JList[String]): JavaRDD[String] =
-    rdd.pipe(asScalaBuffer(command))
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
-    rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
-
-  /**
-   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
-   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
-   * partitions* and the *same number of elements in each partition* (e.g. one was made through
-   * a map on the other).
-   */
-  def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
-    JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
-  }
-
-  /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
-   * applying a function to the zipped partitions. Assumes that all the RDDs have the
-   * *same number of partitions*, but does *not* require them to have the same number
-   * of elements in each partition.
-   */
-  def zipPartitions[U, V](
-      other: JavaRDDLike[U, _],
-      f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
-    def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
-      f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
-    JavaRDD.fromRDD(
-      rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
-  }
-
-  // Actions (launch a job to return a value to the user program)
-
-  /**
-   * Applies a function f to all elements of this RDD.
-   */
-  def foreach(f: VoidFunction[T]) {
-    val cleanF = rdd.context.clean(f)
-    rdd.foreach(cleanF)
-  }
-
-  /**
-   * Return an array that contains all of the elements in this RDD.
-   */
-  def collect(): JList[T] = {
-    import scala.collection.JavaConversions._
-    val arr: java.util.Collection[T] = rdd.collect().toSeq
-    new java.util.ArrayList(arr)
-  }
-
-  /**
-   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
-   */
-  def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
-
-  /**
-   * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
-   * modify t1 and return it as its result value to avoid object allocation; however, it should not
-   * modify t2.
-   */
-  def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
-    rdd.fold(zeroValue)(f)
-
-  /**
-   * Aggregate the elements of each partition, and then the results for all the partitions, using
-   * given combine functions and a neutral "zero value". This function can return a different result
-   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
-   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
-   * allowed to modify and return their first argument instead of creating a new U to avoid memory
-   * allocation.
-   */
-  def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
-    combOp: JFunction2[U, U, U]): U =
-    rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
-
-  /**
-   * Return the number of elements in the RDD.
-   */
-  def count(): Long = rdd.count()
-
-  /**
-   * (Experimental) Approximate version of count() that returns a potentially incomplete result
-   * within a timeout, even if not all tasks have finished.
-   */
-  def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
-    rdd.countApprox(timeout, confidence)
-
-  /**
-   * (Experimental) Approximate version of count() that returns a potentially incomplete result
-   * within a timeout, even if not all tasks have finished.
-   */
-  def countApprox(timeout: Long): PartialResult[BoundedDouble] =
-    rdd.countApprox(timeout)
-
-  /**
-   * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
-   * combine step happens locally on the master, equivalent to running a single reduce task.
-   */
-  def countByValue(): java.util.Map[T, java.lang.Long] =
-    mapAsJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
-
-  /**
-   * (Experimental) Approximate version of countByValue().
-   */
-  def countByValueApprox(
-    timeout: Long,
-    confidence: Double
-    ): PartialResult[java.util.Map[T, BoundedDouble]] =
-    rdd.countByValueApprox(timeout, confidence).map(mapAsJavaMap)
-
-  /**
-   * (Experimental) Approximate version of countByValue().
-   */
-  def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
-    rdd.countByValueApprox(timeout).map(mapAsJavaMap)
-
-  /**
-   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
-   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
-   * whole RDD instead.
-   */
-  def take(num: Int): JList[T] = {
-    import scala.collection.JavaConversions._
-    val arr: java.util.Collection[T] = rdd.take(num).toSeq
-    new java.util.ArrayList(arr)
-  }
-
-  def takeSample(withReplacement: Boolean, num: Int, seed: Int): JList[T] = {
-    import scala.collection.JavaConversions._
-    val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, seed).toSeq
-    new java.util.ArrayList(arr)
-  }
-
-  /**
-   * Return the first element in this RDD.
-   */
-  def first(): T = rdd.first()
-
-  /**
-   * Save this RDD as a text file, using string representations of elements.
-   */
-  def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
-
-
-  /**
-   * Save this RDD as a compressed text file, using string representations of elements.
-   */
-  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
-    rdd.saveAsTextFile(path, codec)
-
-  /**
-   * Save this RDD as a SequenceFile of serialized objects.
-   */
-  def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
-
-  /**
-   * Creates tuples of the elements in this RDD by applying `f`.
-   */
-  def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
-    implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
-    JavaPairRDD.fromRDD(rdd.keyBy(f))
-  }
-
-  /**
-   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
-   * directory set with SparkContext.setCheckpointDir() and all references to its parent
-   * RDDs will be removed. This function must be called before any job has been
-   * executed on this RDD. It is strongly recommended that this RDD is persisted in
-   * memory, otherwise saving it on a file will require recomputation.
-   */
-  def checkpoint() = rdd.checkpoint()
-
-  /**
-   * Return whether this RDD has been checkpointed or not
-   */
-  def isCheckpointed: Boolean = rdd.isCheckpointed
-
-  /**
-   * Gets the name of the file to which this RDD was checkpointed
-   */
-  def getCheckpointFile(): Optional[String] = {
-    JavaUtils.optionToOptional(rdd.getCheckpointFile)
-  }
-
-  /** A description of this RDD and its recursive dependencies for debugging. */
-  def toDebugString(): String = {
-    rdd.toDebugString
-  }
-
-  /**
-   * Returns the top K elements from this RDD as defined by
-   * the specified Comparator[T].
-   * @param num the number of top elements to return
-   * @param comp the comparator that defines the order
-   * @return an array of top elements
-   */
-  def top(num: Int, comp: Comparator[T]): JList[T] = {
-    import scala.collection.JavaConversions._
-    val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
-    val arr: java.util.Collection[T] = topElems.toSeq
-    new java.util.ArrayList(arr)
-  }
-
-  /**
-   * Returns the top K elements from this RDD using the
-   * natural ordering for T.
-   * @param num the number of top elements to return
-   * @return an array of top elements
-   */
-  def top(num: Int): JList[T] = {
-    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
-    top(num, comp)
-  }
-
-  /**
-   * Returns the first K elements from this RDD as defined by
-   * the specified Comparator[T] and maintains the order.
-   * @param num the number of top elements to return
-   * @param comp the comparator that defines the order
-   * @return an array of top elements
-   */
-  def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
-    import scala.collection.JavaConversions._
-    val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp))
-    val arr: java.util.Collection[T] = topElems.toSeq
-    new java.util.ArrayList(arr)
-  }
-
-  /**
-   * Returns the first K elements from this RDD using the
-   * natural ordering for T while maintain the order.
-   * @param num the number of top elements to return
-   * @return an array of top elements
-   */
-  def takeOrdered(num: Int): JList[T] = {
-    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
-    takeOrdered(num, comp)
-  }
-}


Mime
View raw message