Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4327200AF5 for ; Thu, 2 Jun 2016 18:07:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D2B67160A51; Thu, 2 Jun 2016 16:07:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 263F8160A3E for ; Thu, 2 Jun 2016 18:07:22 +0200 (CEST) Received: (qmail 20442 invoked by uid 500); 2 Jun 2016 16:07:21 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 20433 invoked by uid 99); 2 Jun 2016 16:07:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jun 2016 16:07:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1E465DFE65; Thu, 2 Jun 2016 16:07:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srowen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15208][WIP][CORE][STREAMING][DOCS] Update Spark examples with AccumulatorV2 Date: Thu, 2 Jun 2016 16:07:21 +0000 (UTC) archived-at: Thu, 02 Jun 2016 16:07:23 -0000 Repository: spark Updated Branches: refs/heads/master 5eea33230 -> a0eec8e8f [SPARK-15208][WIP][CORE][STREAMING][DOCS] Update Spark examples with AccumulatorV2 ## What changes were proposed in this pull request? The patch updates the codes & docs in the example module as well as the related doc module: - [ ] [docs] `streaming-programming-guide.md` - [x] scala code part - [ ] java code part - [ ] python code part - [x] [examples] `RecoverableNetworkWordCount.scala` - [ ] [examples] `JavaRecoverableNetworkWordCount.java` - [ ] [examples] `recoverable_network_wordcount.py` ## How was this patch tested? Ran the examples and verified results manually. Author: Liwei Lin Closes #12981 from lw-lin/accumulatorV2-examples. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0eec8e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0eec8e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0eec8e8 Branch: refs/heads/master Commit: a0eec8e8ffd5a43cae67aa0f5dbcf7ca19a4f3aa Parents: 5eea332 Author: Liwei Lin Authored: Thu Jun 2 11:07:15 2016 -0500 Committer: Sean Owen Committed: Thu Jun 2 11:07:15 2016 -0500 ---------------------------------------------------------------------- docs/streaming-programming-guide.md | 12 ++++++------ .../streaming/RecoverableNetworkWordCount.scala | 3 +-- 2 files changed, 7 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a0eec8e8/docs/streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 6550fcc..78ae6a7 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1395,13 +1395,13 @@ object WordBlacklist { object DroppedWordsCounter { - @volatile private var instance: Accumulator[Long] = null + @volatile private var instance: LongAccumulator = null - def getInstance(sc: SparkContext): Accumulator[Long] = { + def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { - instance = sc.accumulator(0L, "WordsInBlacklistCounter") + instance = sc.longAccumulator("WordsInBlacklistCounter") } } } @@ -1409,7 +1409,7 @@ object DroppedWordsCounter { } } -wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { +wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator @@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { - droppedWordsCounter += count + droppedWordsCounter.add(count) false } else { true } - }.collect() + }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }) http://git-wip-us.apache.org/repos/asf/spark/blob/a0eec8e8/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index acbcb0c..49c0427 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext, Time} -import org.apache.spark.util.IntParam -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.{IntParam, LongAccumulator} /** * Use this singleton to get or register a Broadcast variable. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org