spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [29/69] [abbrv] [partial] Initial work to rename package to org.apache.spark
Date Sun, 01 Sep 2013 21:59:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
deleted file mode 100644
index 6ff92ce..0000000
--- a/core/src/main/scala/spark/Accumulators.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-
-import scala.collection.mutable.Map
-import scala.collection.generic.Growable
-
-/**
- * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
- * but where the result type, `R`, may be different from the element type being added, `T`.
- *
- * You must define how to add data, and how to merge two of these together.  For some datatypes,
- * such as a counter, these might be the same operation. In that case, you can use the simpler
- * [[spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
- * accumulating a set. You will add items to the set, and you will union two sets together.
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `R` and `T`
- * @tparam R the full accumulated data (result type)
- * @tparam T partial data that can be added in
- */
-class Accumulable[R, T] (
-    @transient initialValue: R,
-    param: AccumulableParam[R, T])
-  extends Serializable {
-  
-  val id = Accumulators.newId
-  @transient private var value_ = initialValue // Current value on master
-  val zero = param.zero(initialValue)  // Zero value to be passed to workers
-  var deserialized = false
-
-  Accumulators.register(this, true)
-
-  /**
-   * Add more data to this accumulator / accumulable
-   * @param term the data to add
-   */
-  def += (term: T) { value_ = param.addAccumulator(value_, term) }
-
-  /**
-   * Add more data to this accumulator / accumulable
-   * @param term the data to add
-   */
-  def add(term: T) { value_ = param.addAccumulator(value_, term) }
-
-  /**
-   * Merge two accumulable objects together
-   *
-   * Normally, a user will not want to use this version, but will instead call `+=`.
-   * @param term the other `R` that will get merged with this
-   */
-  def ++= (term: R) { value_ = param.addInPlace(value_, term)}
-
-  /**
-   * Merge two accumulable objects together
-   *
-   * Normally, a user will not want to use this version, but will instead call `add`.
-   * @param term the other `R` that will get merged with this
-   */
-  def merge(term: R) { value_ = param.addInPlace(value_, term)}
-
-  /**
-   * Access the accumulator's current value; only allowed on master.
-   */
-  def value: R = {
-    if (!deserialized) {
-      value_
-    } else {
-      throw new UnsupportedOperationException("Can't read accumulator value in task")
-    }
-  }
-
-  /**
-   * Get the current value of this accumulator from within a task.
-   *
-   * This is NOT the global value of the accumulator.  To get the global value after a
-   * completed operation on the dataset, call `value`.
-   *
-   * The typical use of this method is to directly mutate the local value, eg., to add
-   * an element to a Set.
-   */
-  def localValue = value_
-
-  /**
-   * Set the accumulator's value; only allowed on master.
-   */
-  def value_= (newValue: R) {
-    if (!deserialized) value_ = newValue
-    else throw new UnsupportedOperationException("Can't assign accumulator value in task")
-  }
-
-  /**
-   * Set the accumulator's value; only allowed on master
-   */
-  def setValue(newValue: R) {
-    this.value = newValue
-  }
- 
-  // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream) {
-    in.defaultReadObject()
-    value_ = zero
-    deserialized = true
-    Accumulators.register(this, false)
-  }
-
-  override def toString = value_.toString
-}
-
-/**
- * Helper object defining how to accumulate values of a particular type. An implicit
- * AccumulableParam needs to be available when you create Accumulables of a specific type.
- *
- * @tparam R the full accumulated data (result type)
- * @tparam T partial data that can be added in
- */
-trait AccumulableParam[R, T] extends Serializable {
-  /**
-   * Add additional data to the accumulator value. Is allowed to modify and return `r`
-   * for efficiency (to avoid allocating objects).
-   *
-   * @param r the current value of the accumulator
-   * @param t the data to be added to the accumulator
-   * @return the new value of the accumulator
-   */
-  def addAccumulator(r: R, t: T): R
-
-  /**
-   * Merge two accumulated values together. Is allowed to modify and return the first value
-   * for efficiency (to avoid allocating objects).
-   *
-   * @param r1 one set of accumulated data
-   * @param r2 another set of accumulated data
-   * @return both data sets merged together
-   */
-  def addInPlace(r1: R, r2: R): R
-
-  /**
-   * Return the "zero" (identity) value for an accumulator type, given its initial value. For
-   * example, if R was a vector of N dimensions, this would return a vector of N zeroes.
-   */
-  def zero(initialValue: R): R
-}
-
-private[spark]
-class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
-  extends AccumulableParam[R,T] {
-
-  def addAccumulator(growable: R, elem: T): R = {
-    growable += elem
-    growable
-  }
-
-  def addInPlace(t1: R, t2: R): R = {
-    t1 ++= t2
-    t1
-  }
-
-  def zero(initialValue: R): R = {
-    // We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
-    // Instead we'll serialize it to a buffer and load it back.
-    val ser = (new spark.JavaSerializer).newInstance()
-    val copy = ser.deserialize[R](ser.serialize(initialValue))
-    copy.clear()   // In case it contained stuff
-    copy
-  }
-}
-
-/**
- * A simpler value of [[spark.Accumulable]] where the result type being accumulated is the same
- * as the types of elements being merged.
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `T`
- * @tparam T result type
- */
-class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
-  extends Accumulable[T,T](initialValue, param)
-
-/**
- * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type
- * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create
- * Accumulators of a specific type.
- *
- * @tparam T type of value to accumulate
- */
-trait AccumulatorParam[T] extends AccumulableParam[T, T] {
-  def addAccumulator(t1: T, t2: T): T = {
-    addInPlace(t1, t2)
-  }
-}
-
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private object Accumulators {
-  // TODO: Use soft references? => need to make readObject work properly then
-  val originals = Map[Long, Accumulable[_, _]]()
-  val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
-  var lastId: Long = 0
-  
-  def newId: Long = synchronized {
-    lastId += 1
-    return lastId
-  }
-
-  def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
-    if (original) {
-      originals(a.id) = a
-    } else {
-      val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
-      accums(a.id) = a
-    }
-  }
-
-  // Clear the local (non-original) accumulators for the current thread
-  def clear() {
-    synchronized {
-      localAccums.remove(Thread.currentThread)
-    }
-  }
-
-  // Get the values of the local accumulators for the current thread (by ID)
-  def values: Map[Long, Any] = synchronized {
-    val ret = Map[Long, Any]()
-    for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
-      ret(id) = accum.localValue
-    }
-    return ret
-  }
-
-  // Add values to the original accumulators with some given IDs
-  def add(values: Map[Long, Any]): Unit = synchronized {
-    for ((id, value) <- values) {
-      if (originals.contains(id)) {
-        originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
deleted file mode 100644
index 9af4019..0000000
--- a/core/src/main/scala/spark/Aggregator.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConversions._
-
-/** A set of functions used to aggregate data.
-  * 
-  * @param createCombiner function to create the initial value of the aggregation.
-  * @param mergeValue function to merge a new value into the aggregation result.
-  * @param mergeCombiners function to merge outputs from multiple mergeValue function.
-  */
-case class Aggregator[K, V, C] (
-    createCombiner: V => C,
-    mergeValue: (C, V) => C,
-    mergeCombiners: (C, C) => C) {
-
-  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
-    val combiners = new JHashMap[K, C]
-    for (kv <- iter) {
-      val oldC = combiners.get(kv._1)
-      if (oldC == null) {
-        combiners.put(kv._1, createCombiner(kv._2))
-      } else {
-        combiners.put(kv._1, mergeValue(oldC, kv._2))
-      }
-    }
-    combiners.iterator
-  }
-
-  def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
-    val combiners = new JHashMap[K, C]
-    iter.foreach { case(k, c) =>
-      val oldC = combiners.get(k)
-      if (oldC == null) {
-        combiners.put(k, c)
-      } else {
-        combiners.put(k, mergeCombiners(oldC, c))
-      }
-    }
-    combiners.iterator
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
deleted file mode 100644
index 1ec95ed..0000000
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-
-import spark.executor.{ShuffleReadMetrics, TaskMetrics}
-import spark.serializer.Serializer
-import spark.storage.BlockManagerId
-import spark.util.CompletionIterator
-
-
-private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
-
-  override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
-    : Iterator[T] =
-  {
-
-    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
-    val blockManager = SparkEnv.get.blockManager
-
-    val startTime = System.currentTimeMillis
-    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
-    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
-      shuffleId, reduceId, System.currentTimeMillis - startTime))
-
-    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
-    for (((address, size), index) <- statuses.zipWithIndex) {
-      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
-    }
-
-    val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map {
-      case (address, splits) =>
-        (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
-    }
-
-    def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
-      val blockId = blockPair._1
-      val blockOption = blockPair._2
-      blockOption match {
-        case Some(block) => {
-          block.asInstanceOf[Iterator[T]]
-        }
-        case None => {
-          val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
-          blockId match {
-            case regex(shufId, mapId, _) =>
-              val address = statuses(mapId.toInt)._1
-              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
-            case _ =>
-              throw new SparkException(
-                "Failed to get block " + blockId + ", which is not a shuffle block")
-          }
-        }
-      }
-    }
-
-    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
-    val itr = blockFetcherItr.flatMap(unpackBlock)
-
-    CompletionIterator[T, Iterator[T]](itr, {
-      val shuffleMetrics = new ShuffleReadMetrics
-      shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
-      shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
-      shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
-      shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
-      shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
-      shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
-      shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
-      metrics.shuffleReadMetrics = Some(shuffleMetrics)
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/CacheManager.scala b/core/src/main/scala/spark/CacheManager.scala
deleted file mode 100644
index 8131480..0000000
--- a/core/src/main/scala/spark/CacheManager.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.mutable.{ArrayBuffer, HashSet}
-import spark.storage.{BlockManager, StorageLevel}
-
-
-/** Spark class responsible for passing RDDs split contents to the BlockManager and making
-    sure a node doesn't load two copies of an RDD at once.
-  */
-private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-  private val loading = new HashSet[String]
-
-  /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
-  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
-      : Iterator[T] = {
-    val key = "rdd_%d_%d".format(rdd.id, split.index)
-    logInfo("Cache key is " + key)
-    blockManager.get(key) match {
-      case Some(cachedValues) =>
-        // Partition is in cache, so just return its values
-        logInfo("Found partition in cache!")
-        return cachedValues.asInstanceOf[Iterator[T]]
-
-      case None =>
-        // Mark the split as loading (unless someone else marks it first)
-        loading.synchronized {
-          if (loading.contains(key)) {
-            logInfo("Loading contains " + key + ", waiting...")
-            while (loading.contains(key)) {
-              try {loading.wait()} catch {case _ : Throwable =>}
-            }
-            logInfo("Loading no longer contains " + key + ", so returning cached result")
-            // See whether someone else has successfully loaded it. The main way this would fail
-            // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
-            // partition but we didn't want to make space for it. However, that case is unlikely
-            // because it's unlikely that two threads would work on the same RDD partition. One
-            // downside of the current code is that threads wait serially if this does happen.
-            blockManager.get(key) match {
-              case Some(values) =>
-                return values.asInstanceOf[Iterator[T]]
-              case None =>
-                logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
-                loading.add(key)
-            }
-          } else {
-            loading.add(key)
-          }
-        }
-        try {
-          // If we got here, we have to load the split
-          val elements = new ArrayBuffer[Any]
-          logInfo("Computing partition " + split)
-          elements ++= rdd.computeOrReadCheckpoint(split, context)
-          // Try to put this block in the blockManager
-          blockManager.put(key, elements, storageLevel, true)
-          return elements.iterator.asInstanceOf[Iterator[T]]
-        } finally {
-          loading.synchronized {
-            loading.remove(key)
-            loading.notifyAll()
-          }
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala
deleted file mode 100644
index 8b39241..0000000
--- a/core/src/main/scala/spark/ClosureCleaner.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.lang.reflect.Field
-
-import scala.collection.mutable.Map
-import scala.collection.mutable.Set
-
-import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
-
-private[spark] object ClosureCleaner extends Logging {
-  // Get an ASM class reader for a given class from the JAR that loaded it
-  private def getClassReader(cls: Class[_]): ClassReader = {
-    // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
-    val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
-    val resourceStream = cls.getResourceAsStream(className)
-    // todo: Fixme - continuing with earlier behavior ...
-    if (resourceStream == null) return new ClassReader(resourceStream)
-
-    val baos = new ByteArrayOutputStream(128)
-    Utils.copyStream(resourceStream, baos, true)
-    new ClassReader(new ByteArrayInputStream(baos.toByteArray))
-  }
-
-  // Check whether a class represents a Scala closure
-  private def isClosure(cls: Class[_]): Boolean = {
-    cls.getName.contains("$anonfun$")
-  }
-  
-  // Get a list of the classes of the outer objects of a given closure object, obj;
-  // the outer objects are defined as any closures that obj is nested within, plus
-  // possibly the class that the outermost closure is in, if any. We stop searching
-  // for outer objects beyond that because cloning the user's object is probably
-  // not a good idea (whereas we can clone closure objects just fine since we
-  // understand how all their fields are used).
-  private def getOuterClasses(obj: AnyRef): List[Class[_]] = {
-    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
-      f.setAccessible(true)
-      if (isClosure(f.getType)) {
-        return f.getType :: getOuterClasses(f.get(obj))
-      } else {
-        return f.getType :: Nil // Stop at the first $outer that is not a closure
-      }
-    }
-    return Nil
-  }
-  
-  // Get a list of the outer objects for a given closure object.
-  private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
-    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
-      f.setAccessible(true)
-      if (isClosure(f.getType)) {
-        return f.get(obj) :: getOuterObjects(f.get(obj))
-      } else {
-        return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
-      }
-    }
-    return Nil
-  }
-  
-  private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
-    val seen = Set[Class[_]](obj.getClass)
-    var stack = List[Class[_]](obj.getClass)
-    while (!stack.isEmpty) {
-      val cr = getClassReader(stack.head)
-      stack = stack.tail
-      val set = Set[Class[_]]()
-      cr.accept(new InnerClosureFinder(set), 0)
-      for (cls <- set -- seen) {
-        seen += cls
-        stack = cls :: stack
-      }
-    }
-    return (seen - obj.getClass).toList
-  }
-  
-  private def createNullValue(cls: Class[_]): AnyRef = {
-    if (cls.isPrimitive) {
-      new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
-    } else {
-      null
-    }
-  }
-  
-  def clean(func: AnyRef) {
-    // TODO: cache outerClasses / innerClasses / accessedFields
-    val outerClasses = getOuterClasses(func)
-    val innerClasses = getInnerClasses(func)
-    val outerObjects = getOuterObjects(func)
-    
-    val accessedFields = Map[Class[_], Set[String]]()
-    for (cls <- outerClasses)
-      accessedFields(cls) = Set[String]()
-    for (cls <- func.getClass :: innerClasses)
-      getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
-    //logInfo("accessedFields: " + accessedFields)
-
-    val inInterpreter = {
-      try {
-        val interpClass = Class.forName("spark.repl.Main")
-        interpClass.getMethod("interp").invoke(null) != null
-      } catch {
-        case _: ClassNotFoundException => true
-      }
-    }
-
-    var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
-    var outer: AnyRef = null
-    if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
-      // The closure is ultimately nested inside a class; keep the object of that
-      // class without cloning it since we don't want to clone the user's objects.
-      outer = outerPairs.head._2
-      outerPairs = outerPairs.tail
-    }
-    // Clone the closure objects themselves, nulling out any fields that are not
-    // used in the closure we're working on or any of its inner closures.
-    for ((cls, obj) <- outerPairs) {
-      outer = instantiateClass(cls, outer, inInterpreter)
-      for (fieldName <- accessedFields(cls)) {
-        val field = cls.getDeclaredField(fieldName)
-        field.setAccessible(true)
-        val value = field.get(obj)
-        //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
-        field.set(outer, value)
-      }
-    }
-    
-    if (outer != null) {
-      //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
-      val field = func.getClass.getDeclaredField("$outer")
-      field.setAccessible(true)
-      field.set(func, outer)
-    }
-  }
-  
-  private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
-    //logInfo("Creating a " + cls + " with outer = " + outer)
-    if (!inInterpreter) {
-      // This is a bona fide closure class, whose constructor has no effects
-      // other than to set its fields, so use its constructor
-      val cons = cls.getConstructors()(0)
-      val params = cons.getParameterTypes.map(createNullValue).toArray
-      if (outer != null)
-        params(0) = outer // First param is always outer object
-      return cons.newInstance(params: _*).asInstanceOf[AnyRef]
-    } else {
-      // Use reflection to instantiate object without calling constructor
-      val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
-      val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
-      val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
-      val obj = newCtor.newInstance().asInstanceOf[AnyRef]
-      if (outer != null) {
-        //logInfo("3: Setting $outer on " + cls + " to " + outer);
-        val field = cls.getDeclaredField("$outer")
-        field.setAccessible(true)
-        field.set(obj, outer)
-      }
-      return obj
-    }
-  }
-}
-
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
-  override def visitMethod(access: Int, name: String, desc: String,
-      sig: String, exceptions: Array[String]): MethodVisitor = {
-    return new MethodVisitor(ASM4) {
-      override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
-        if (op == GETFIELD) {
-          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
-            output(cl) += name
-          }
-        }
-      }
-      
-      override def visitMethodInsn(op: Int, owner: String, name: String,
-          desc: String) {
-        // Check for calls a getter method for a variable in an interpreter wrapper object.
-        // This means that the corresponding field will be accessed, so we should save it.
-        if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
-          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
-            output(cl) += name
-          }
-        }
-      }
-    }
-  }
-}
-
-private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
-  var myName: String = null
-  
-  override def visit(version: Int, access: Int, name: String, sig: String,
-      superName: String, interfaces: Array[String]) {
-    myName = name
-  }
-  
-  override def visitMethod(access: Int, name: String, desc: String,
-      sig: String, exceptions: Array[String]): MethodVisitor = {
-    return new MethodVisitor(ASM4) {
-      override def visitMethodInsn(op: Int, owner: String, name: String,
-          desc: String) {
-        val argTypes = Type.getArgumentTypes(desc)
-        if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
-            && argTypes(0).toString.startsWith("L") // is it an object?
-            && argTypes(0).getInternalName == myName)
-          output += Class.forName(
-              owner.replace('/', '.'),
-              false,
-              Thread.currentThread.getContextClassLoader)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
deleted file mode 100644
index d5a9606..0000000
--- a/core/src/main/scala/spark/Dependency.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-/**
- * Base class for dependencies.
- */
-abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
-
-
-/**
- * Base class for dependencies where each partition of the parent RDD is used by at most one
- * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
- */
-abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
-  /**
-   * Get the parent partitions for a child partition.
-   * @param partitionId a partition of the child RDD
-   * @return the partitions of the parent RDD that the child partition depends upon
-   */
-  def getParents(partitionId: Int): Seq[Int]
-}
-
-
-/**
- * Represents a dependency on the output of a shuffle stage.
- * @param rdd the parent RDD
- * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
- */
-class ShuffleDependency[K, V](
-    @transient rdd: RDD[_ <: Product2[K, V]],
-    val partitioner: Partitioner,
-    val serializerClass: String = null)
-  extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
-
-  val shuffleId: Int = rdd.context.newShuffleId()
-}
-
-
-/**
- * Represents a one-to-one dependency between partitions of the parent and child RDDs.
- */
-class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
-  override def getParents(partitionId: Int) = List(partitionId)
-}
-
-
-/**
- * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
- * @param rdd the parent RDD
- * @param inStart the start of the range in the parent RDD
- * @param outStart the start of the range in the child RDD
- * @param length the length of the range
- */
-class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
-  extends NarrowDependency[T](rdd) {
-
-  override def getParents(partitionId: Int) = {
-    if (partitionId >= outStart && partitionId < outStart + length) {
-      List(partitionId - outStart + inStart)
-    } else {
-      Nil
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala
deleted file mode 100644
index 104168e..0000000
--- a/core/src/main/scala/spark/DoubleRDDFunctions.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.partial.BoundedDouble
-import spark.partial.MeanEvaluator
-import spark.partial.PartialResult
-import spark.partial.SumEvaluator
-import spark.util.StatCounter
-
-/**
- * Extra functions available on RDDs of Doubles through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
-  /** Add up the elements in this RDD. */
-  def sum(): Double = {
-    self.reduce(_ + _)
-  }
-
-  /**
-   * Return a [[spark.util.StatCounter]] object that captures the mean, variance and count
-   * of the RDD's elements in one operation.
-   */
-  def stats(): StatCounter = {
-    self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
-  }
-
-  /** Compute the mean of this RDD's elements. */
-  def mean(): Double = stats().mean
-
-  /** Compute the variance of this RDD's elements. */
-  def variance(): Double = stats().variance
-
-  /** Compute the standard deviation of this RDD's elements. */
-  def stdev(): Double = stats().stdev
-
-  /** 
-   * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
-   * estimating the standard deviation by dividing by N-1 instead of N).
-   */
-  def sampleStdev(): Double = stats().sampleStdev
-
-  /**
-   * Compute the sample variance of this RDD's elements (which corrects for bias in
-   * estimating the variance by dividing by N-1 instead of N).
-   */
-  def sampleVariance(): Double = stats().sampleVariance
-
-  /** (Experimental) Approximate operation to return the mean within a timeout. */
-  def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
-    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
-    val evaluator = new MeanEvaluator(self.partitions.size, confidence)
-    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
-  }
-
-  /** (Experimental) Approximate operation to return the sum within a timeout. */
-  def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
-    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
-    val evaluator = new SumEvaluator(self.partitions.size, confidence)
-    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala
deleted file mode 100644
index a2dae6c..0000000
--- a/core/src/main/scala/spark/FetchFailedException.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.storage.BlockManagerId
-
-private[spark] class FetchFailedException(
-    taskEndReason: TaskEndReason,
-    message: String,
-    cause: Throwable)
-  extends Exception {
-
-  def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, cause: Throwable) =
-    this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
-      "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
-      cause)
-
-  def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
-    this(FetchFailed(null, shuffleId, -1, reduceId),
-      "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
-
-  override def getMessage(): String = message
-
-
-  override def getCause(): Throwable = cause
-
-  def toTaskEndReason: TaskEndReason = taskEndReason
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
deleted file mode 100644
index a13a7a2..0000000
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.{File}
-import com.google.common.io.Files
-
-private[spark] class HttpFileServer extends Logging {
-  
-  var baseDir : File = null
-  var fileDir : File = null
-  var jarDir : File = null
-  var httpServer : HttpServer = null
-  var serverUri : String = null
-  
-  def initialize() {
-    baseDir = Utils.createTempDir()
-    fileDir = new File(baseDir, "files")
-    jarDir = new File(baseDir, "jars")
-    fileDir.mkdir()
-    jarDir.mkdir()
-    logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(baseDir)
-    httpServer.start()
-    serverUri = httpServer.uri
-  }
-  
-  def stop() {
-    httpServer.stop()
-  }
-  
-  def addFile(file: File) : String = {
-    addFileToDir(file, fileDir)
-    return serverUri + "/files/" + file.getName
-  }
-  
-  def addJar(file: File) : String = {
-    addFileToDir(file, jarDir)
-    return serverUri + "/jars/" + file.getName
-  }
-  
-  def addFileToDir(file: File, dir: File) : String = {
-    Files.copy(file, new File(dir, file.getName))
-    return dir + "/" + file.getName
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/HttpServer.scala b/core/src/main/scala/spark/HttpServer.scala
deleted file mode 100644
index c9dffbc..0000000
--- a/core/src/main/scala/spark/HttpServer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.File
-import java.net.InetAddress
-
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.server.bio.SocketConnector
-import org.eclipse.jetty.server.handler.DefaultHandler
-import org.eclipse.jetty.server.handler.HandlerList
-import org.eclipse.jetty.server.handler.ResourceHandler
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-
-/**
- * Exception type thrown by HttpServer when it is in the wrong state for an operation.
- */
-private[spark] class ServerStateException(message: String) extends Exception(message)
-
-/**
- * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
- * as well as classes created by the interpreter when the user types in code. This is just a wrapper
- * around a Jetty server.
- */
-private[spark] class HttpServer(resourceBase: File) extends Logging {
-  private var server: Server = null
-  private var port: Int = -1
-
-  def start() {
-    if (server != null) {
-      throw new ServerStateException("Server is already started")
-    } else {
-      server = new Server()
-      val connector = new SocketConnector
-      connector.setMaxIdleTime(60*1000)
-      connector.setSoLingerTime(-1)
-      connector.setPort(0)
-      server.addConnector(connector)
-
-      val threadPool = new QueuedThreadPool
-      threadPool.setDaemon(true)
-      server.setThreadPool(threadPool)
-      val resHandler = new ResourceHandler
-      resHandler.setResourceBase(resourceBase.getAbsolutePath)
-      val handlerList = new HandlerList
-      handlerList.setHandlers(Array(resHandler, new DefaultHandler))
-      server.setHandler(handlerList)
-      server.start()
-      port = server.getConnectors()(0).getLocalPort()
-    }
-  }
-
-  def stop() {
-    if (server == null) {
-      throw new ServerStateException("Server is already stopped")
-    } else {
-      server.stop()
-      port = -1
-      server = null
-    }
-  }
-
-  /**
-   * Get the URI of this HTTP server (http://host:port)
-   */
-  def uri: String = {
-    if (server == null) {
-      throw new ServerStateException("Server is not started")
-    } else {
-      return "http://" + Utils.localIpAddress + ":" + port
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
deleted file mode 100644
index 04c5f44..0000000
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.nio.ByteBuffer
-
-import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
-import spark.util.ByteBufferInputStream
-
-private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
-  val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
-  def flush() { objOut.flush() }
-  def close() { objOut.close() }
-}
-
-private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
-extends DeserializationStream {
-  val objIn = new ObjectInputStream(in) {
-    override def resolveClass(desc: ObjectStreamClass) =
-      Class.forName(desc.getName, false, loader)
-  }
-
-  def readObject[T](): T = objIn.readObject().asInstanceOf[T]
-  def close() { objIn.close() }
-}
-
-private[spark] class JavaSerializerInstance extends SerializerInstance {
-  def serialize[T](t: T): ByteBuffer = {
-    val bos = new ByteArrayOutputStream()
-    val out = serializeStream(bos)
-    out.writeObject(t)
-    out.close()
-    ByteBuffer.wrap(bos.toByteArray)
-  }
-
-  def deserialize[T](bytes: ByteBuffer): T = {
-    val bis = new ByteBufferInputStream(bytes)
-    val in = deserializeStream(bis)
-    in.readObject().asInstanceOf[T]
-  }
-
-  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
-    val bis = new ByteBufferInputStream(bytes)
-    val in = deserializeStream(bis, loader)
-    in.readObject().asInstanceOf[T]
-  }
-
-  def serializeStream(s: OutputStream): SerializationStream = {
-    new JavaSerializationStream(s)
-  }
-
-  def deserializeStream(s: InputStream): DeserializationStream = {
-    new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
-  }
-
-  def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
-    new JavaDeserializationStream(s, loader)
-  }
-}
-
-/**
- * A Spark serializer that uses Java's built-in serialization.
- */
-class JavaSerializer extends Serializer {
-  def newInstance(): SerializerInstance = new JavaSerializerInstance
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
deleted file mode 100644
index eeb2993..0000000
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.nio.ByteBuffer
-import com.esotericsoftware.kryo.{Kryo, KryoException}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import com.twitter.chill.ScalaKryoInstantiator
-import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
-import spark.broadcast._
-import spark.storage._
-
-private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
-  val output = new KryoOutput(outStream)
-
-  def writeObject[T](t: T): SerializationStream = {
-    kryo.writeClassAndObject(output, t)
-    this
-  }
-
-  def flush() { output.flush() }
-  def close() { output.close() }
-}
-
-private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
-  val input = new KryoInput(inStream)
-
-  def readObject[T](): T = {
-    try {
-      kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping condition.
-      case _: KryoException => throw new EOFException
-    }
-  }
-
-  def close() {
-    // Kryo's Input automatically closes the input stream it is using.
-    input.close()
-  }
-}
-
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-  val kryo = ks.newKryo()
-  val output = ks.newKryoOutput()
-  val input = ks.newKryoInput()
-
-  def serialize[T](t: T): ByteBuffer = {
-    output.clear()
-    kryo.writeClassAndObject(output, t)
-    ByteBuffer.wrap(output.toBytes)
-  }
-
-  def deserialize[T](bytes: ByteBuffer): T = {
-    input.setBuffer(bytes.array)
-    kryo.readClassAndObject(input).asInstanceOf[T]
-  }
-
-  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
-    val oldClassLoader = kryo.getClassLoader
-    kryo.setClassLoader(loader)
-    input.setBuffer(bytes.array)
-    val obj = kryo.readClassAndObject(input).asInstanceOf[T]
-    kryo.setClassLoader(oldClassLoader)
-    obj
-  }
-
-  def serializeStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(kryo, s)
-  }
-
-  def deserializeStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(kryo, s)
-  }
-}
-
-/**
- * Interface implemented by clients to register their classes with Kryo when using Kryo
- * serialization.
- */
-trait KryoRegistrator {
-  def registerClasses(kryo: Kryo)
-}
-
-/**
- * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
- */
-class KryoSerializer extends spark.serializer.Serializer with Logging {
-  private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
-
-  def newKryoOutput() = new KryoOutput(bufferSize)
-
-  def newKryoInput() = new KryoInput(bufferSize)
-
-  def newKryo(): Kryo = {
-    val instantiator = new ScalaKryoInstantiator
-    val kryo = instantiator.newKryo()
-    val classLoader = Thread.currentThread.getContextClassLoader
-
-    // Register some commonly used classes
-    val toRegister: Seq[AnyRef] = Seq(
-      ByteBuffer.allocate(1),
-      StorageLevel.MEMORY_ONLY,
-      PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
-      GotBlock("1", ByteBuffer.allocate(1)),
-      GetBlock("1")
-    )
-
-    for (obj <- toRegister) kryo.register(obj.getClass)
-
-    // Allow sending SerializableWritable
-    kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
-    kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
-
-    // Allow the user to register their own classes by setting spark.kryo.registrator
-    try {
-      Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
-        logDebug("Running user registrator: " + regCls)
-        val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
-        reg.registerClasses(kryo)
-      }
-    } catch {
-      case _: Exception => println("Failed to register spark.kryo.registrator")
-    }
-
-    kryo.setClassLoader(classLoader)
-
-    // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
-    kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
-
-    kryo
-  }
-
-  def newInstance(): SerializerInstance = {
-    new KryoSerializerInstance(this)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala
deleted file mode 100644
index 79b0362..0000000
--- a/core/src/main/scala/spark/Logging.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-
-/**
- * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
- * logging messages at different levels using methods that only evaluate parameters lazily if the
- * log level is enabled.
- */
-trait Logging {
-  // Make the log field transient so that objects with Logging can
-  // be serialized and used on another machine
-  @transient private var log_ : Logger = null
-
-  // Method to get or create the logger for this object
-  protected def log: Logger = {
-    if (log_ == null) {
-      var className = this.getClass.getName
-      // Ignore trailing $'s in the class names for Scala objects
-      if (className.endsWith("$")) {
-        className = className.substring(0, className.length - 1)
-      }
-      log_ = LoggerFactory.getLogger(className)
-    }
-    return log_
-  }
-
-  // Log methods that take only a String
-  protected def logInfo(msg: => String) {
-    if (log.isInfoEnabled) log.info(msg)
-  }
-
-  protected def logDebug(msg: => String) {
-    if (log.isDebugEnabled) log.debug(msg)
-  }
-  
-  protected def logTrace(msg: => String) {
-    if (log.isTraceEnabled) log.trace(msg)
-  }
-
-  protected def logWarning(msg: => String) {
-    if (log.isWarnEnabled) log.warn(msg)
-  }
-
-  protected def logError(msg: => String) {
-    if (log.isErrorEnabled) log.error(msg)
-  }
-
-  // Log methods that take Throwables (Exceptions/Errors) too
-  protected def logInfo(msg: => String, throwable: Throwable) {
-    if (log.isInfoEnabled) log.info(msg, throwable)
-  }
-
-  protected def logDebug(msg: => String, throwable: Throwable) {
-    if (log.isDebugEnabled) log.debug(msg, throwable)
-  }
-
-  protected def logTrace(msg: => String, throwable: Throwable) {
-    if (log.isTraceEnabled) log.trace(msg, throwable)
-  }
-
-  protected def logWarning(msg: => String, throwable: Throwable) {
-    if (log.isWarnEnabled) log.warn(msg, throwable)
-  }
-
-  protected def logError(msg: => String, throwable: Throwable) {
-    if (log.isErrorEnabled) log.error(msg, throwable)
-  }
-
-  protected def isTraceEnabled(): Boolean = {
-    log.isTraceEnabled
-  }
-
-  // Method for ensuring that logging is initialized, to avoid having multiple
-  // threads do it concurrently (as SLF4J initialization is not thread safe).
-  protected def initLogging() { log }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
deleted file mode 100644
index 0cd0341..0000000
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import akka.actor._
-import akka.dispatch._
-import akka.pattern.ask
-import akka.remote._
-import akka.util.Duration
-
-
-import spark.scheduler.MapStatus
-import spark.storage.BlockManagerId
-import spark.util.{MetadataCleaner, TimeStampedHashMap}
-
-
-private[spark] sealed trait MapOutputTrackerMessage
-private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
-  extends MapOutputTrackerMessage
-private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
-
-private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
-  def receive = {
-    case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
-      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
-      sender ! tracker.getSerializedLocations(shuffleId)
-
-    case StopMapOutputTracker =>
-      logInfo("MapOutputTrackerActor stopped!")
-      sender ! true
-      context.stop(self)
-  }
-}
-
-private[spark] class MapOutputTracker extends Logging {
-
-  private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-  
-  // Set to the MapOutputTrackerActor living on the driver
-  var trackerActor: ActorRef = _
-
-  private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
-
-  // Incremented every time a fetch fails so that client nodes know to clear
-  // their cache of map output locations if this happens.
-  private var epoch: Long = 0
-  private val epochLock = new java.lang.Object
-
-  // Cache a serialized version of the output statuses for each shuffle to send them out faster
-  var cacheEpoch = epoch
-  private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
-
-  val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
-
-  // Send a message to the trackerActor and get its result within a default timeout, or
-  // throw a SparkException if this fails.
-  def askTracker(message: Any): Any = {
-    try {
-      val future = trackerActor.ask(message)(timeout)
-      return Await.result(future, timeout)
-    } catch {
-      case e: Exception =>
-        throw new SparkException("Error communicating with MapOutputTracker", e)
-    }
-  }
-
-  // Send a one-way message to the trackerActor, to which we expect it to reply with true.
-  def communicate(message: Any) {
-    if (askTracker(message) != true) {
-      throw new SparkException("Error reply received from MapOutputTracker")
-    }
-  }
-
-  def registerShuffle(shuffleId: Int, numMaps: Int) {
-    if (mapStatuses.putIfAbsent(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
-      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
-    }
-  }
-
-  def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
-    var array = mapStatuses(shuffleId)
-    array.synchronized {
-      array(mapId) = status
-    }
-  }
-
-  def registerMapOutputs(
-      shuffleId: Int,
-      statuses: Array[MapStatus],
-      changeEpoch: Boolean = false) {
-    mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
-    if (changeEpoch) {
-      incrementEpoch()
-    }
-  }
-
-  def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) {
-    var arrayOpt = mapStatuses.get(shuffleId)
-    if (arrayOpt.isDefined && arrayOpt.get != null) {
-      var array = arrayOpt.get
-      array.synchronized {
-        if (array(mapId) != null && array(mapId).location == bmAddress) {
-          array(mapId) = null
-        }
-      }
-      incrementEpoch()
-    } else {
-      throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
-    }
-  }
-
-  // Remembers which map output locations are currently being fetched on a worker
-  private val fetching = new HashSet[Int]
-
-  // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
-  def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
-      var fetchedStatuses: Array[MapStatus] = null
-      fetching.synchronized {
-        if (fetching.contains(shuffleId)) {
-          // Someone else is fetching it; wait for them to be done
-          while (fetching.contains(shuffleId)) {
-            try {
-              fetching.wait()
-            } catch {
-              case e: InterruptedException =>
-            }
-          }
-        }
-
-        // Either while we waited the fetch happened successfully, or
-        // someone fetched it in between the get and the fetching.synchronized.
-        fetchedStatuses = mapStatuses.get(shuffleId).orNull
-        if (fetchedStatuses == null) {
-          // We have to do the fetch, get others to wait for us.
-          fetching += shuffleId
-        }
-      }
-      
-      if (fetchedStatuses == null) {
-        // We won the race to fetch the output locs; do so
-        logInfo("Doing the fetch; tracker actor = " + trackerActor)
-        val hostPort = Utils.localHostPort()
-        // This try-finally prevents hangs due to timeouts:
-        try {
-          val fetchedBytes =
-            askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]]
-          fetchedStatuses = deserializeStatuses(fetchedBytes)
-          logInfo("Got the output locations")
-          mapStatuses.put(shuffleId, fetchedStatuses)
-        } finally {
-          fetching.synchronized {
-            fetching -= shuffleId
-            fetching.notifyAll()
-          }
-        }
-      }
-      if (fetchedStatuses != null) {
-        fetchedStatuses.synchronized {
-          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
-        }
-      }
-      else{
-        throw new FetchFailedException(null, shuffleId, -1, reduceId,
-          new Exception("Missing all output locations for shuffle " + shuffleId))
-      }      
-    } else {
-      statuses.synchronized {
-        return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
-      }
-    }
-  }
-
-  private def cleanup(cleanupTime: Long) {
-    mapStatuses.clearOldValues(cleanupTime)
-    cachedSerializedStatuses.clearOldValues(cleanupTime)
-  }
-
-  def stop() {
-    communicate(StopMapOutputTracker)
-    mapStatuses.clear()
-    metadataCleaner.cancel()
-    trackerActor = null
-  }
-
-  // Called on master to increment the epoch number
-  def incrementEpoch() {
-    epochLock.synchronized {
-      epoch += 1
-      logDebug("Increasing epoch to " + epoch)
-    }
-  }
-
-  // Called on master or workers to get current epoch number
-  def getEpoch: Long = {
-    epochLock.synchronized {
-      return epoch
-    }
-  }
-
-  // Called on workers to update the epoch number, potentially clearing old outputs
-  // because of a fetch failure. (Each worker task calls this with the latest epoch
-  // number on the master at the time it was created.)
-  def updateEpoch(newEpoch: Long) {
-    epochLock.synchronized {
-      if (newEpoch > epoch) {
-        logInfo("Updating epoch to " + newEpoch + " and clearing cache")
-        // mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
-        mapStatuses.clear()
-        epoch = newEpoch
-      }
-    }
-  }
-
-  def getSerializedLocations(shuffleId: Int): Array[Byte] = {
-    var statuses: Array[MapStatus] = null
-    var epochGotten: Long = -1
-    epochLock.synchronized {
-      if (epoch > cacheEpoch) {
-        cachedSerializedStatuses.clear()
-        cacheEpoch = epoch
-      }
-      cachedSerializedStatuses.get(shuffleId) match {
-        case Some(bytes) =>
-          return bytes
-        case None =>
-          statuses = mapStatuses(shuffleId)
-          epochGotten = epoch
-      }
-    }
-    // If we got here, we failed to find the serialized locations in the cache, so we pulled
-    // out a snapshot of the locations as "locs"; let's serialize and return that
-    val bytes = serializeStatuses(statuses)
-    logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
-    // Add them into the table only if the epoch hasn't changed while we were working
-    epochLock.synchronized {
-      if (epoch == epochGotten) {
-        cachedSerializedStatuses(shuffleId) = bytes
-      }
-    }
-    return bytes
-  }
-
-  // Serialize an array of map output locations into an efficient byte format so that we can send
-  // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
-  // generally be pretty compressible because many map outputs will be on the same hostname.
-  private def serializeStatuses(statuses: Array[MapStatus]): Array[Byte] = {
-    val out = new ByteArrayOutputStream
-    val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
-    // Since statuses can be modified in parallel, sync on it
-    statuses.synchronized {
-      objOut.writeObject(statuses)
-    }
-    objOut.close()
-    out.toByteArray
-  }
-
-  // Opposite of serializeStatuses.
-  def deserializeStatuses(bytes: Array[Byte]): Array[MapStatus] = {
-    val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
-    objIn.readObject().
-      // // drop all null's from status - not sure why they are occuring though. Causes NPE downstream in slave if present
-      // comment this out - nulls could be due to missing location ? 
-      asInstanceOf[Array[MapStatus]] // .filter( _ != null )
-  }
-}
-
-private[spark] object MapOutputTracker {
-  private val LOG_BASE = 1.1
-
-  // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
-  // any of the statuses is null (indicating a missing location due to a failed mapper),
-  // throw a FetchFailedException.
-  private def convertMapStatuses(
-        shuffleId: Int,
-        reduceId: Int,
-        statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
-    assert (statuses != null)
-    statuses.map {
-      status => 
-        if (status == null) {
-          throw new FetchFailedException(null, shuffleId, -1, reduceId,
-            new Exception("Missing an output location for shuffle " + shuffleId))
-        } else {
-          (status.location, decompressSize(status.compressedSizes(reduceId)))
-        }
-    }
-  }
-
-  /**
-   * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
-   * We do this by encoding the log base 1.1 of the size as an integer, which can support
-   * sizes up to 35 GB with at most 10% error.
-   */
-  def compressSize(size: Long): Byte = {
-    if (size == 0) {
-      0
-    } else if (size <= 1L) {
-      1
-    } else {
-      math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
-    }
-  }
-
-  /**
-   * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
-   */
-  def decompressSize(compressedSize: Byte): Long = {
-    if (compressedSize == 0) {
-      0
-    } else {
-      math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
-    }
-  }
-}


Mime
View raw message