spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [2/3] git commit: Renamed cloneKeyValues to cloneRecords; updated docs.
Date Sun, 12 Jan 2014 05:53:29 GMT
Renamed cloneKeyValues to cloneRecords; updated docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/362cda18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/362cda18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/362cda18

Branch: refs/heads/master
Commit: 362cda18bcb08f12ec680f10e190dd93418c998e
Parents: b0fbfcc
Author: Reynold Xin <rxin@apache.org>
Authored: Sat Jan 11 18:01:29 2014 -0800
Committer: Reynold Xin <rxin@apache.org>
Committed: Sat Jan 11 18:01:29 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 57 ++++++++++----------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 18 +++----
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 14 ++---
 3 files changed, 45 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/362cda18/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1656461..9a3d36b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -354,11 +354,11 @@ class SparkContext(
    * @param keyClass Class of the keys
    * @param valueClass Class of the values
    * @param minSplits Minimum number of Hadoop Splits to generate.
-   * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
-   *                       Most RecordReader implementations reuse wrapper objects across
multiple
-   *                       records, and can cause problems in RDD collect or aggregation
operations.
-   *                       By default the records are cloned in Spark. However, application
-   *                       programmers can explicitly disable the cloning for better performance.
+   * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+   *                     Most RecordReader implementations reuse wrapper objects across multiple
+   *                     records, and can cause problems in RDD collect or aggregation operations.
+   *                     By default the records are cloned in Spark. However, application
+   *                     programmers can explicitly disable the cloning for better performance.
    */
   def hadoopRDD[K: ClassTag, V: ClassTag](
       conf: JobConf,
@@ -366,11 +366,11 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits,
-      cloneKeyValues: Boolean = true
+      cloneRecords: Boolean = true
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
     SparkHadoopUtil.get.addCredentials(conf)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
+    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
@@ -380,7 +380,7 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits,
-      cloneKeyValues: Boolean = true
+      cloneRecords: Boolean = true
       ): RDD[(K, V)] = {
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -393,7 +393,7 @@ class SparkContext(
       keyClass,
       valueClass,
       minSplits,
-      cloneKeyValues)
+      cloneRecords)
   }
 
   /**
@@ -405,14 +405,14 @@ class SparkContext(
    * }}}
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]]
-      (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
+      (path: String, minSplits: Int, cloneRecords: Boolean = true)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     hadoopFile(path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
       vm.runtimeClass.asInstanceOf[Class[V]],
       minSplits,
-      cloneKeyValues = cloneKeyValues)
+      cloneRecords)
   }
 
   /**
@@ -423,20 +423,20 @@ class SparkContext(
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
    * }}}
    */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean =
true)
+  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
-    hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
+    hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
 
   /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
-      (path: String, cloneKeyValues: Boolean = true)
+      (path: String, cloneRecords: Boolean = true)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     newAPIHadoopFile(
       path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
       vm.runtimeClass.asInstanceOf[Class[V]],
-      cloneKeyValues = cloneKeyValues)
+      cloneRecords = cloneRecords)
   }
 
   /**
@@ -449,11 +449,11 @@ class SparkContext(
       kClass: Class[K],
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration,
-      cloneKeyValues: Boolean = true): RDD[(K, V)] = {
+      cloneRecords: Boolean = true): RDD[(K, V)] = {
     val job = new NewHadoopJob(conf)
     NewFileInputFormat.addInputPath(job, new Path(path))
     val updatedConf = job.getConfiguration
-    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
+    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
   }
 
   /**
@@ -465,8 +465,8 @@ class SparkContext(
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
-      cloneKeyValues: Boolean = true): RDD[(K, V)] = {
-    new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
+      cloneRecords: Boolean = true): RDD[(K, V)] = {
+    new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
   }
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
@@ -474,16 +474,16 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int,
-      cloneKeyValues: Boolean = true
+      cloneRecords: Boolean = true
       ): RDD[(K, V)] = {
     val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
-    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
+    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
   }
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
   def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass:
Class[V],
-      cloneKeyValues: Boolean = true): RDD[(K, V)] =
-    sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
+      cloneRecords: Boolean = true): RDD[(K, V)] =
+    sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
 
   /**
    * Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -501,17 +501,18 @@ class SparkContext(
    * for the appropriate type. In addition, we pass the converter a ClassTag of its type
to
    * allow it to figure out the Writable class to use in the subclass case.
    */
-   def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
-       cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
-          kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
+   def sequenceFile[K, V]
+       (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
+       (implicit km: ClassTag[K], vm: ClassTag[V],
+        kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
       : RDD[(K, V)] = {
     val kc = kcf()
     val vc = vcf()
     val format = classOf[SequenceFileInputFormat[Writable, Writable]]
     val writables = hadoopFile(path, format,
         kc.writableClass(km).asInstanceOf[Class[Writable]],
-        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
-    writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
+        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
+    writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/362cda18/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 9a220d1..902083c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -64,11 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient
s: InputSp
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
- * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
- *                       Most RecordReader implementations reuse wrapper objects across multiple
- *                       records, and can cause problems in RDD collect or aggregation operations.
- *                       By default the records are cloned in Spark. However, application
- *                       programmers can explicitly disable the cloning for better performance.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ *                     Most RecordReader implementations reuse wrapper objects across multiple
+ *                     records, and can cause problems in RDD collect or aggregation operations.
+ *                     By default the records are cloned in Spark. However, application
+ *                     programmers can explicitly disable the cloning for better performance.
  */
 class HadoopRDD[K: ClassTag, V: ClassTag](
     sc: SparkContext,
@@ -78,7 +78,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int,
-    cloneKeyValues: Boolean)
+    cloneRecords: Boolean)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   def this(
@@ -88,7 +88,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int,
-      cloneKeyValues: Boolean) = {
+      cloneRecords: Boolean) = {
     this(
       sc,
       sc.broadcast(new SerializableWritable(conf))
@@ -98,7 +98,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       keyClass,
       valueClass,
       minSplits,
-      cloneKeyValues)
+      cloneRecords)
   }
 
   protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -180,7 +180,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
           case eof: EOFException =>
             finished = true
         }
-        if (cloneKeyValues) {
+        if (cloneRecords) {
           (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
         } else {
           (key, value)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/362cda18/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2f2d011..992bd4a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -48,11 +48,11 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit:
InputS
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param conf The Hadoop configuration.
- * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
- *                       Most RecordReader implementations reuse wrapper objects across multiple
- *                       records, and can cause problems in RDD collect or aggregation operations.
- *                       By default the records are cloned in Spark. However, application
- *                       programmers can explicitly disable the cloning for better performance.
+ * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+ *                     Most RecordReader implementations reuse wrapper objects across multiple
+ *                     records, and can cause problems in RDD collect or aggregation operations.
+ *                     By default the records are cloned in Spark. However, application
+ *                     programmers can explicitly disable the cloning for better performance.
  */
 class NewHadoopRDD[K: ClassTag, V: ClassTag](
     sc : SparkContext,
@@ -60,7 +60,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
     keyClass: Class[K],
     valueClass: Class[V],
     @transient conf: Configuration,
-    cloneKeyValues: Boolean)
+    cloneRecords: Boolean)
   extends RDD[(K, V)](sc, Nil)
   with SparkHadoopMapReduceUtil
   with Logging {
@@ -127,7 +127,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
         havePair = false
         val key = reader.getCurrentKey
         val value = reader.getCurrentValue
-        if (cloneKeyValues) {
+        if (cloneRecords) {
           (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
         } else {
           (key, value)


Mime
View raw message