Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C4340200B2A for ; Sat, 11 Jun 2016 00:40:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C2B2B160A5A; Fri, 10 Jun 2016 22:40:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BD1FD160A38 for ; Sat, 11 Jun 2016 00:40:33 +0200 (CEST) Received: (qmail 27046 invoked by uid 500); 10 Jun 2016 22:40:33 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 27036 invoked by uid 99); 10 Jun 2016 22:40:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2016 22:40:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0A28DFC4F; Fri, 10 Jun 2016 22:40:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible Date: Fri, 10 Jun 2016 22:40:32 +0000 (UTC) archived-at: Fri, 10 Jun 2016 22:40:34 -0000 Repository: spark Updated Branches: refs/heads/master 127a6678d -> 2022afe57 [SPARK-15773][CORE][EXAMPLE] Avoid creating local variable `sc` in examples if possible ## What changes were proposed in this pull request? Instead of using local variable `sc` like the following example, this PR uses `spark.sparkContext`. This makes examples more concise, and also fixes some misleading, i.e., creating SparkContext from SparkSession. ``` - println("Creating SparkContext") - val sc = spark.sparkContext - println("Writing local file to DFS") val dfsFilename = dfsDirPath + "/dfs_read_write_test" - val fileRDD = sc.parallelize(fileContents) + val fileRDD = spark.sparkContext.parallelize(fileContents) ``` This will change 12 files (+30 lines, -52 lines). ## How was this patch tested? Manual. Author: Dongjoon Hyun Closes #13520 from dongjoon-hyun/SPARK-15773. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2022afe5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2022afe5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2022afe5 Branch: refs/heads/master Commit: 2022afe57dbf8cb0c9909399962c4a3649e0601c Parents: 127a667 Author: Dongjoon Hyun Authored: Fri Jun 10 15:40:29 2016 -0700 Committer: Reynold Xin Committed: Fri Jun 10 15:40:29 2016 -0700 ---------------------------------------------------------------------- examples/src/main/python/pi.py | 4 +--- examples/src/main/python/transitive_closure.py | 4 +--- .../apache/spark/examples/DFSReadWriteTest.scala | 7 ++----- .../spark/examples/ExceptionHandlingTest.scala | 3 +-- .../org/apache/spark/examples/GroupByTest.scala | 14 ++++++-------- .../apache/spark/examples/MultiBroadcastTest.scala | 8 +++----- .../spark/examples/SimpleSkewedGroupByTest.scala | 16 +++++++--------- .../apache/spark/examples/SkewedGroupByTest.scala | 13 +++++-------- .../scala/org/apache/spark/examples/SparkLR.scala | 4 +--- .../scala/org/apache/spark/examples/SparkPi.scala | 3 +-- .../scala/org/apache/spark/examples/SparkTC.scala | 3 +-- .../spark/examples/sql/hive/HiveFromSpark.scala | 3 +-- 12 files changed, 30 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/python/pi.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index b39d710..e3f0c4a 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -32,8 +32,6 @@ if __name__ == "__main__": .appName("PythonPi")\ .getOrCreate() - sc = spark.sparkContext - partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions @@ -42,7 +40,7 @@ if __name__ == "__main__": y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 - count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add) + count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/python/transitive_closure.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index d88ea94..49551d4 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -46,10 +46,8 @@ if __name__ == "__main__": .appName("PythonTransitiveClosure")\ .getOrCreate() - sc = spark.sparkContext - partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 - tc = sc.parallelize(generateGraph(), partitions).cache() + tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache() # Linear transitive closure: each round grows paths by one edge, # by joining the graph's edges with the already-discovered paths. http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 4b5e36c..3bff7ce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -107,16 +107,13 @@ object DFSReadWriteTest { .appName("DFS Read Write Test") .getOrCreate() - println("Creating SparkContext") - val sc = spark.sparkContext - println("Writing local file to DFS") val dfsFilename = dfsDirPath + "/dfs_read_write_test" - val fileRDD = sc.parallelize(fileContents) + val fileRDD = spark.sparkContext.parallelize(fileContents) fileRDD.saveAsTextFile(dfsFilename) println("Reading file from DFS and running Word Count") - val readFileRDD = sc.textFile(dfsFilename) + val readFileRDD = spark.sparkContext.textFile(dfsFilename) val dfsWordCount = readFileRDD .flatMap(_.split(" ")) http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala index 6a1bbed..45c4953 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala @@ -25,9 +25,8 @@ object ExceptionHandlingTest { .builder .appName("ExceptionHandlingTest") .getOrCreate() - val sc = spark.sparkContext - sc.parallelize(0 until sc.defaultParallelism).foreach { i => + spark.sparkContext.parallelize(0 until spark.sparkContext.defaultParallelism).foreach { i => if (math.random > 0.75) { throw new Exception("Testing exception handling") } http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index 0cb61d7..2f2bbb1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -32,16 +32,14 @@ object GroupByTest { .appName("GroupBy Test") .getOrCreate() - var numMappers = if (args.length > 0) args(0).toInt else 2 - var numKVPairs = if (args.length > 1) args(1).toInt else 1000 - var valSize = if (args.length > 2) args(2).toInt else 1000 - var numReducers = if (args.length > 3) args(3).toInt else numMappers + val numMappers = if (args.length > 0) args(0).toInt else 2 + val numKVPairs = if (args.length > 1) args(1).toInt else 1000 + val valSize = if (args.length > 2) args(2).toInt else 1000 + val numReducers = if (args.length > 3) args(3).toInt else numMappers - val sc = spark.sparkContext - - val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random - var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + val arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index 961ab99..6495a86 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -33,8 +33,6 @@ object MultiBroadcastTest { .appName("Multi-Broadcast Test") .getOrCreate() - val sc = spark.sparkContext - val slices = if (args.length > 0) args(0).toInt else 2 val num = if (args.length > 1) args(1).toInt else 1000000 @@ -48,9 +46,9 @@ object MultiBroadcastTest { arr2(i) = i } - val barr1 = sc.broadcast(arr1) - val barr2 = sc.broadcast(arr2) - val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ => + val barr1 = spark.sparkContext.broadcast(arr1) + val barr2 = spark.sparkContext.broadcast(arr2) + val observedSizes: RDD[(Int, Int)] = spark.sparkContext.parallelize(1 to 10, slices).map { _ => (barr1.value.length, barr2.value.length) } // Collect the small RDD so we can print the observed sizes locally. http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 255c2bf..8e1a574 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -32,17 +32,15 @@ object SimpleSkewedGroupByTest { .appName("SimpleSkewedGroupByTest") .getOrCreate() - val sc = spark.sparkContext + val numMappers = if (args.length > 0) args(0).toInt else 2 + val numKVPairs = if (args.length > 1) args(1).toInt else 1000 + val valSize = if (args.length > 2) args(2).toInt else 1000 + val numReducers = if (args.length > 3) args(3).toInt else numMappers + val ratio = if (args.length > 4) args(4).toInt else 5.0 - var numMappers = if (args.length > 0) args(0).toInt else 2 - var numKVPairs = if (args.length > 1) args(1).toInt else 1000 - var valSize = if (args.length > 2) args(2).toInt else 1000 - var numReducers = if (args.length > 3) args(3).toInt else numMappers - var ratio = if (args.length > 4) args(4).toInt else 5.0 - - val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random - var result = new Array[(Int, Array[Byte])](numKVPairs) + val result = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index efd4014..4d3c340 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -32,21 +32,18 @@ object SkewedGroupByTest { .appName("GroupBy Test") .getOrCreate() - val sc = spark.sparkContext - - var numMappers = if (args.length > 0) args(0).toInt else 2 + val numMappers = if (args.length > 0) args(0).toInt else 2 var numKVPairs = if (args.length > 1) args(1).toInt else 1000 - var valSize = if (args.length > 2) args(2).toInt else 1000 - var numReducers = if (args.length > 3) args(3).toInt else numMappers - + val valSize = if (args.length > 2) args(2).toInt else 1000 + val numReducers = if (args.length > 3) args(3).toInt else numMappers - val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val pairs1 = spark.sparkContext.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random // map output sizes linearly increase from the 1st to the last numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt - var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + val arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index 8ef3aab..afa8f58 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -68,10 +68,8 @@ object SparkLR { .appName("SparkLR") .getOrCreate() - val sc = spark.sparkContext - val numSlices = if (args.length > 0) args(0).toInt else 2 - val points = sc.parallelize(generateData, numSlices).cache() + val points = spark.sparkContext.parallelize(generateData, numSlices).cache() // Initialize w to a random value var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 5be8f3b..42f6cef 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -29,10 +29,9 @@ object SparkPi { .builder .appName("Spark Pi") .getOrCreate() - val sc = spark.sparkContext val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow - val count = sc.parallelize(1 until n, slices).map { i => + val count = spark.sparkContext.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index 46aa68b..558295a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -46,9 +46,8 @@ object SparkTC { .builder .appName("SparkTC") .getOrCreate() - val sc = spark.sparkContext val slices = if (args.length > 0) args(0).toInt else 2 - var tc = sc.parallelize(generateGraph, slices).cache() + var tc = spark.sparkContext.parallelize(generateGraph, slices).cache() // Linear transitive closure: each round grows paths by one edge, // by joining the graph's edges with the already-discovered paths. http://git-wip-us.apache.org/repos/asf/spark/blob/2022afe5/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 2d7a01a..2343f98 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -45,7 +45,6 @@ object HiveFromSpark { .appName("HiveFromSpark") .enableHiveSupport() .getOrCreate() - val sc = spark.sparkContext import spark.implicits._ import spark.sql @@ -71,7 +70,7 @@ object HiveFromSpark { } // You can also use RDDs to create temporary views within a HiveContext. - val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) + val rdd = spark.sparkContext.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) rdd.toDF().createOrReplaceTempView("records") // Queries can then join RDD data with data stored in Hive. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org