spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: SPARK-2038: rename "conf" parameters in the saveAsHadoop functions
Date Tue, 17 Jun 2014 19:17:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2794990e9 -> 443f5e1bb


SPARK-2038: rename "conf" parameters in the saveAsHadoop functions

to distinguish with SparkConf object

https://issues.apache.org/jira/browse/SPARK-2038

Author: CodingCat <zhunansjtu@gmail.com>

Closes #1087 from CodingCat/SPARK-2038 and squashes the following commits:

763975f [CodingCat] style fix
d91288d [CodingCat] rename "conf" parameters in the saveAsHadoop functions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/443f5e1b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/443f5e1b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/443f5e1b

Branch: refs/heads/master
Commit: 443f5e1bbcf9ec55e5ce6e4f738a002a47818100
Parents: 2794990
Author: CodingCat <zhunansjtu@gmail.com>
Authored: Tue Jun 17 12:17:48 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Tue Jun 17 12:17:48 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 49 ++++++++++----------
 1 file changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/443f5e1b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index fe36c80..bff77b4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = self.context.hadoopConfiguration)
+      hadoopConf: Configuration = self.context.hadoopConfiguration)
   {
-    val job = new NewAPIHadoopJob(conf)
+    val job = new NewAPIHadoopJob(hadoopConf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
     job.setOutputFormatClass(outputFormatClass)
@@ -752,24 +752,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
+      hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),
       codec: Option[Class[_ <: CompressionCodec]] = None) {
-    conf.setOutputKeyClass(keyClass)
-    conf.setOutputValueClass(valueClass)
+    hadoopConf.setOutputKeyClass(keyClass)
+    hadoopConf.setOutputValueClass(valueClass)
     // Doesn't work in Scala 2.9 due to what may be a generics bug
     // TODO: Should we uncomment this for Scala 2.10?
     // conf.setOutputFormat(outputFormatClass)
-    conf.set("mapred.output.format.class", outputFormatClass.getName)
+    hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
     for (c <- codec) {
-      conf.setCompressMapOutput(true)
-      conf.set("mapred.output.compress", "true")
-      conf.setMapOutputCompressorClass(c)
-      conf.set("mapred.output.compression.codec", c.getCanonicalName)
-      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+      hadoopConf.setCompressMapOutput(true)
+      hadoopConf.set("mapred.output.compress", "true")
+      hadoopConf.setMapOutputCompressorClass(c)
+      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
+      hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
     }
-    conf.setOutputCommitter(classOf[FileOutputCommitter])
-    FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
-    saveAsHadoopDataset(conf)
+    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+    FileOutputFormat.setOutputPath(hadoopConf,
+      SparkHadoopWriter.createPathFromString(path, hadoopConf))
+    saveAsHadoopDataset(hadoopConf)
   }
 
   /**
@@ -778,8 +779,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * output paths required (e.g. a table name to write to) in the same way as it would be
    * configured for a Hadoop MapReduce job.
    */
-  def saveAsNewAPIHadoopDataset(conf: Configuration) {
-    val job = new NewAPIHadoopJob(conf)
+  def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {
+    val job = new NewAPIHadoopJob(hadoopConf)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = self.id
@@ -835,10 +836,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
    * MapReduce job.
    */
-  def saveAsHadoopDataset(conf: JobConf) {
-    val outputFormatInstance = conf.getOutputFormat
-    val keyClass = conf.getOutputKeyClass
-    val valueClass = conf.getOutputValueClass
+  def saveAsHadoopDataset(hadoopConf: JobConf) {
+    val outputFormatInstance = hadoopConf.getOutputFormat
+    val keyClass = hadoopConf.getOutputKeyClass
+    val valueClass = hadoopConf.getOutputValueClass
     if (outputFormatInstance == null) {
       throw new SparkException("Output format class not set")
     }
@@ -848,18 +849,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     if (valueClass == null) {
       throw new SparkException("Output value class not set")
     }
-    SparkHadoopUtil.get.addCredentials(conf)
+    SparkHadoopUtil.get.addCredentials(hadoopConf)
 
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
       valueClass.getSimpleName + ")")
 
     if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
       // FileOutputFormat ignores the filesystem parameter
-      val ignoredFs = FileSystem.get(conf)
-      conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
+      val ignoredFs = FileSystem.get(hadoopConf)
+      hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
     }
 
-    val writer = new SparkHadoopWriter(conf)
+    val writer = new SparkHadoopWriter(hadoopConf)
     writer.preSetup()
 
     def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {


Mime
View raw message