spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/3] git commit: Minor update for clone writables and more documentation.
Date Sun, 12 Jan 2014 05:53:28 GMT
Updated Branches:
  refs/heads/master dbc11df41 -> 288a87899


Minor update for clone writables and more documentation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b0fbfcca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b0fbfcca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b0fbfcca

Branch: refs/heads/master
Commit: b0fbfccadc2bb308c3cbcd5a55157e63cc8916f6
Parents: ee6e7f9
Author: Reynold Xin <rxin@apache.org>
Authored: Sat Jan 11 12:35:10 2014 -0800
Committer: Reynold Xin <rxin@apache.org>
Committed: Sat Jan 11 12:35:10 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 17 ++++++++++++++---
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 16 ++++++++++------
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 20 +++++++++++++++++---
 3 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d7e681d..1656461 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -345,9 +345,20 @@ class SparkContext(
   }
 
   /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat
and any
-   * other necessary info (e.g. file name for a filesystem-based dataset, table name for
HyperTable,
-   * etc).
+   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat
and other
+   * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
+   * using the older MapReduce API (`org.apache.hadoop.mapred`).
+   *
+   * @param conf JobConf for setting up the dataset
+   * @param inputFormatClass Class of the [[InputFormat]]
+   * @param keyClass Class of the keys
+   * @param valueClass Class of the values
+   * @param minSplits Minimum number of Hadoop Splits to generate.
+   * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+   *                       Most RecordReader implementations reuse wrapper objects across
multiple
+   *                       records, and can cause problems in RDD collect or aggregation
operations.
+   *                       By default the records are cloned in Spark. However, application
+   *                       programmers can explicitly disable the cloning for better performance.
    */
   def hadoopRDD[K: ClassTag, V: ClassTag](
       conf: JobConf,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2da4611..9a220d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -45,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient
s: InputSp
 
   val inputSplit = new SerializableWritable[InputSplit](s)
 
-  override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
 
   override val index: Int = idx
 }
 
 /**
  * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files
in HDFS,
- * sources in HBase, or S3).
+ * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
  *
  * @param sc The SparkContext to associate the RDD with.
  * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
@@ -64,6 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s:
InputSp
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ *                       Most RecordReader implementations reuse wrapper objects across multiple
+ *                       records, and can cause problems in RDD collect or aggregation operations.
+ *                       By default the records are cloned in Spark. However, application
+ *                       programmers can explicitly disable the cloning for better performance.
  */
 class HadoopRDD[K: ClassTag, V: ClassTag](
     sc: SparkContext,
@@ -165,9 +170,9 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback{ () => closeIfNeeded() }
       val key: K = reader.createKey()
-      val keyCloneFunc = cloneWritables[K](getConf)
+      val keyCloneFunc = cloneWritables[K](jobConf)
       val value: V = reader.createValue()
-      val valueCloneFunc = cloneWritables[V](getConf)
+      val valueCloneFunc = cloneWritables[V](jobConf)
       override def getNext() = {
         try {
           finished = !reader.next(key, value)
@@ -176,8 +181,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
             finished = true
         }
         if (cloneKeyValues) {
-          (keyCloneFunc(key.asInstanceOf[Writable]),
-            valueCloneFunc(value.asInstanceOf[Writable]))
+          (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
         } else {
           (key, value)
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a347864..2f2d011 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -36,9 +36,24 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit:
InputS
 
   val serializableHadoopSplit = new SerializableWritable(rawSplit)
 
-  override def hashCode(): Int = (41 * (41 + rddId) + index)
+  override def hashCode(): Int = 41 * (41 + rddId) + index
 }
 
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files
in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param conf The Hadoop configuration.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ *                       Most RecordReader implementations reuse wrapper objects across multiple
+ *                       records, and can cause problems in RDD collect or aggregation operations.
+ *                       By default the records are cloned in Spark. However, application
+ *                       programmers can explicitly disable the cloning for better performance.
+ */
 class NewHadoopRDD[K: ClassTag, V: ClassTag](
     sc : SparkContext,
     inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -113,8 +128,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
         val key = reader.getCurrentKey
         val value = reader.getCurrentValue
         if (cloneKeyValues) {
-          (keyCloneFunc(key.asInstanceOf[Writable]),
-            valueCloneFunc(value.asInstanceOf[Writable]))
+          (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
         } else {
           (key, value)
         }


Mime
View raw message