Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0CC71091F for ; Sun, 12 Jan 2014 05:54:31 +0000 (UTC) Received: (qmail 52415 invoked by uid 500); 12 Jan 2014 05:54:08 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 52378 invoked by uid 500); 12 Jan 2014 05:54:03 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 52369 invoked by uid 99); 12 Jan 2014 05:54:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Jan 2014 05:54:01 +0000 X-ASF-Spam-Status: No, hits=-2000.1 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 12 Jan 2014 05:54:00 +0000 Received: (qmail 52315 invoked by uid 99); 12 Jan 2014 05:53:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Jan 2014 05:53:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A09D982D961; Sun, 12 Jan 2014 05:53:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.incubator.apache.org Date: Sun, 12 Jan 2014 05:53:28 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: Minor update for clone writables and more documentation. X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master dbc11df41 -> 288a87899 Minor update for clone writables and more documentation. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b0fbfcca Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b0fbfcca Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b0fbfcca Branch: refs/heads/master Commit: b0fbfccadc2bb308c3cbcd5a55157e63cc8916f6 Parents: ee6e7f9 Author: Reynold Xin Authored: Sat Jan 11 12:35:10 2014 -0800 Committer: Reynold Xin Committed: Sat Jan 11 12:35:10 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 17 ++++++++++++++--- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 16 ++++++++++------ .../org/apache/spark/rdd/NewHadoopRDD.scala | 20 +++++++++++++++++--- 3 files changed, 41 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d7e681d..1656461 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -345,9 +345,20 @@ class SparkContext( } /** - * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any - * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, - * etc). + * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other + * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), + * using the older MapReduce API (`org.apache.hadoop.mapred`). + * + * @param conf JobConf for setting up the dataset + * @param inputFormatClass Class of the [[InputFormat]] + * @param keyClass Class of the keys + * @param valueClass Class of the values + * @param minSplits Minimum number of Hadoop Splits to generate. + * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. */ def hadoopRDD[K: ClassTag, V: ClassTag]( conf: JobConf, http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2da4611..9a220d1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -45,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp val inputSplit = new SerializableWritable[InputSplit](s) - override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt + override def hashCode(): Int = 41 * (41 + rddId) + idx override val index: Int = idx } /** * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, - * sources in HBase, or S3). + * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`). * * @param sc The SparkContext to associate the RDD with. * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed @@ -64,6 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. + * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. */ class HadoopRDD[K: ClassTag, V: ClassTag]( sc: SparkContext, @@ -165,9 +170,9 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() - val keyCloneFunc = cloneWritables[K](getConf) + val keyCloneFunc = cloneWritables[K](jobConf) val value: V = reader.createValue() - val valueCloneFunc = cloneWritables[V](getConf) + val valueCloneFunc = cloneWritables[V](jobConf) override def getNext() = { try { finished = !reader.next(key, value) @@ -176,8 +181,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( finished = true } if (cloneKeyValues) { - (keyCloneFunc(key.asInstanceOf[Writable]), - valueCloneFunc(value.asInstanceOf[Writable])) + (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable])) } else { (key, value) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0fbfcca/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index a347864..2f2d011 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -36,9 +36,24 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = (41 * (41 + rddId) + index) + override def hashCode(): Int = 41 * (41 + rddId) + index } +/** + * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, + * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). + * + * @param sc The SparkContext to associate the RDD with. + * @param inputFormatClass Storage format of the data to be read. + * @param keyClass Class of the key associated with the inputFormatClass. + * @param valueClass Class of the value associated with the inputFormatClass. + * @param conf The Hadoop configuration. + * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. + */ class NewHadoopRDD[K: ClassTag, V: ClassTag]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], @@ -113,8 +128,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( val key = reader.getCurrentKey val value = reader.getCurrentValue if (cloneKeyValues) { - (keyCloneFunc(key.asInstanceOf[Writable]), - valueCloneFunc(value.asInstanceOf[Writable])) + (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable])) } else { (key, value) }