spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API
Date Sun, 12 Jun 2016 18:44:36 GMT
Repository: spark
Updated Branches:
  refs/heads/master 50248dcff -> f51dfe616


[SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API

## What changes were proposed in this pull request?

- Deprecate old Java accumulator API; should use Scala now
- Update Java tests and examples
- Don't bother testing old accumulator API in Java 8 (too)
- (fix a misspelling too)

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #13606 from srowen/SPARK-15086.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f51dfe61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f51dfe61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f51dfe61

Branch: refs/heads/master
Commit: f51dfe616b24b4234199c98ea857a586a93a889f
Parents: 50248dc
Author: Sean Owen <sowen@cloudera.com>
Authored: Sun Jun 12 11:44:33 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sun Jun 12 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  6 +--
 .../spark/api/java/JavaSparkContext.scala       | 12 +++++-
 .../java/org/apache/spark/JavaAPISuite.java     |  6 ++-
 docs/programming-guide.md                       |  4 +-
 docs/streaming-programming-guide.md             |  8 ++--
 .../JavaRecoverableNetworkWordCount.java        | 10 ++---
 .../apache/spark/java8/Java8RDDAPISuite.java    | 39 --------------------
 .../spark/java8/dstream/Java8APISuite.java      | 28 --------------
 .../org/apache/spark/sql/JavaDatasetSuite.java  |  4 +-
 .../apache/spark/streaming/JavaAPISuite.java    |  6 +--
 10 files changed, 33 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 230fabd..8bfe7ec 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1245,7 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   /**
    * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
-   * with `+=`. Only the driver can access the accumuable's `value`.
+   * with `+=`. Only the driver can access the accumulable's `value`.
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
@@ -1259,8 +1259,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   /**
    * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display
in the
-   * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver
can
-   * access the accumuable's `value`.
+   * Spark UI. Tasks can add values to the accumulable using the `+=` operator. Only the
driver can
+   * access the accumulable's `value`.
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 485a8b4..131f36f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -530,6 +530,7 @@ class JavaSparkContext(val sc: SparkContext)
    * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
    * to using the `add` method. Only the master can access the accumulator's `value`.
    */
+  @deprecated("use sc().longAccumulator()", "2.0.0")
   def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
     sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
 
@@ -539,6 +540,7 @@ class JavaSparkContext(val sc: SparkContext)
    *
    * This version supports naming the accumulator for display in Spark's web UI.
    */
+  @deprecated("use sc().longAccumulator(String)", "2.0.0")
   def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
     sc.accumulator(initialValue, name)(IntAccumulatorParam)
       .asInstanceOf[Accumulator[java.lang.Integer]]
@@ -547,6 +549,7 @@ class JavaSparkContext(val sc: SparkContext)
    * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
    * to using the `add` method. Only the master can access the accumulator's `value`.
    */
+  @deprecated("use sc().doubleAccumulator()", "2.0.0")
   def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
     sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
 
@@ -556,6 +559,7 @@ class JavaSparkContext(val sc: SparkContext)
    *
    * This version supports naming the accumulator for display in Spark's web UI.
    */
+  @deprecated("use sc().doubleAccumulator(String)", "2.0.0")
   def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double]
=
     sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
       .asInstanceOf[Accumulator[java.lang.Double]]
@@ -564,6 +568,7 @@ class JavaSparkContext(val sc: SparkContext)
    * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
    * to using the `add` method. Only the master can access the accumulator's `value`.
    */
+  @deprecated("use sc().longAccumulator()", "2.0.0")
   def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
 
   /**
@@ -572,6 +577,7 @@ class JavaSparkContext(val sc: SparkContext)
    *
    * This version supports naming the accumulator for display in Spark's web UI.
    */
+  @deprecated("use sc().longAccumulator(String)", "2.0.0")
   def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
     intAccumulator(initialValue, name)
 
@@ -579,6 +585,7 @@ class JavaSparkContext(val sc: SparkContext)
    * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
    * to using the `add` method. Only the master can access the accumulator's `value`.
    */
+  @deprecated("use sc().doubleAccumulator()", "2.0.0")
   def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
     doubleAccumulator(initialValue)
 
@@ -589,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext)
    *
    * This version supports naming the accumulator for display in Spark's web UI.
    */
+  @deprecated("use sc().doubleAccumulator(String)", "2.0.0")
   def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
     doubleAccumulator(initialValue, name)
 
@@ -613,7 +621,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   /**
    * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which
tasks
-   * can "add" values with `add`. Only the master can access the accumuable's `value`.
+   * can "add" values with `add`. Only the master can access the accumulable's `value`.
    */
   @deprecated("use AccumulatorV2", "2.0.0")
   def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R]
=
@@ -621,7 +629,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   /**
    * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which
tasks
-   * can "add" values with `add`. Only the master can access the accumuable's `value`.
+   * can "add" values with `add`. Only the master can access the accumulable's `value`.
    *
    * This version supports naming the accumulator for display in Spark's web UI.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 04f92d6..7bac068 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -70,6 +70,7 @@ import org.apache.spark.partial.PartialResult;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.serializer.KryoSerializer;
 import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.spark.util.StatCounter;
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
@@ -287,7 +288,7 @@ public class JavaAPISuite implements Serializable {
 
   @Test
   public void foreach() {
-    final Accumulator<Integer> accum = sc.accumulator(0);
+    final LongAccumulator accum = sc.sc().longAccumulator();
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
     rdd.foreach(new VoidFunction<String>() {
       @Override
@@ -300,7 +301,7 @@ public class JavaAPISuite implements Serializable {
 
   @Test
   public void foreachPartition() {
-    final Accumulator<Integer> accum = sc.accumulator(0);
+    final LongAccumulator accum = sc.sc().longAccumulator();
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
     rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
       @Override
@@ -1377,6 +1378,7 @@ public class JavaAPISuite implements Serializable {
     assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void accumulators() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 70fd627..3f081a0 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1394,7 +1394,7 @@ Note that, when programmers define their own type of AccumulatorV2,
the resultin
 <div data-lang="java"  markdown="1">
 
 {% highlight java %}
-Accumulator<Integer> accum = sc.accumulator(0);
+LongAccumulator accum = sc.sc().longAccumulator();
 
 sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
 // ...
@@ -1485,7 +1485,7 @@ data.map { x => accum += x; x }
 
 <div data-lang="java"  markdown="1">
 {% highlight java %}
-Accumulator<Integer> accum = sc.accumulator(0);
+LongAccumulator accum = sc.sc().longAccumulator();
 data.map(x -> { accum.add(x); return f(x); });
 // Here, accum is still 0 because no actions have caused the `map` to be computed.
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 0a6a039..4ea3b60 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1452,13 +1452,13 @@ class JavaWordBlacklist {
 
 class JavaDroppedWordsCounter {
 
-  private static volatile Accumulator<Integer> instance = null;
+  private static volatile LongAccumulator instance = null;
 
-  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+  public static LongAccumulator getInstance(JavaSparkContext jsc) {
     if (instance == null) {
       synchronized (JavaDroppedWordsCounter.class) {
         if (instance == null) {
-          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
         }
       }
     }
@@ -1472,7 +1472,7 @@ wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>,
Time, Void>()
     // Get or register the blacklist Broadcast
     final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new
JavaSparkContext(rdd.context()));
     // Get or register the droppedWordsCounter Accumulator
-    final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new
JavaSparkContext(rdd.context()));
+    final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
     // Use blacklist to drop words and use droppedWordsCounter to count them
     String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>()
{
       @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index 0563149..acbc345 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -29,7 +29,6 @@ import scala.Tuple2;
 
 import com.google.common.io.Files;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -41,6 +40,7 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.util.LongAccumulator;
 
 /**
  * Use this singleton to get or register a Broadcast variable.
@@ -67,13 +67,13 @@ class JavaWordBlacklist {
  */
 class JavaDroppedWordsCounter {
 
-  private static volatile Accumulator<Integer> instance = null;
+  private static volatile LongAccumulator instance = null;
 
-  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+  public static LongAccumulator getInstance(JavaSparkContext jsc) {
     if (instance == null) {
       synchronized (JavaDroppedWordsCounter.class) {
         if (instance == null) {
-          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
         }
       }
     }
@@ -158,7 +158,7 @@ public final class JavaRecoverableNetworkWordCount {
         final Broadcast<List<String>> blacklist =
             JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
         // Get or register the droppedWordsCounter Accumulator
-        final Accumulator<Integer> droppedWordsCounter =
+        final LongAccumulator droppedWordsCounter =
             JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
         // Use blacklist to drop words and use droppedWordsCounter to count them
         String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>()
{

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
index 8ee0e7e..fa3a66e 100644
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
+++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
@@ -33,8 +33,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.Accumulator;
-import org.apache.spark.AccumulatorParam;
 import org.apache.spark.api.java.JavaDoubleRDD;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -303,43 +301,6 @@ public class Java8RDDAPISuite implements Serializable {
   }
 
   @Test
-  public void accumulators() {
-    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
-    Accumulator<Integer> intAccum = sc.intAccumulator(10);
-    rdd.foreach(intAccum::add);
-    Assert.assertEquals((Integer) 25, intAccum.value());
-
-    Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
-    rdd.foreach(x -> doubleAccum.add((double) x));
-    Assert.assertEquals((Double) 25.0, doubleAccum.value());
-
-    // Try a custom accumulator type
-    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>()
{
-      @Override
-      public Float addInPlace(Float r, Float t) {
-        return r + t;
-      }
-      @Override
-      public Float addAccumulator(Float r, Float t) {
-        return r + t;
-      }
-      @Override
-      public Float zero(Float initialValue) {
-        return 0.0f;
-      }
-    };
-
-    Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
-    rdd.foreach(x -> floatAccum.add((float) x));
-    Assert.assertEquals((Float) 25.0f, floatAccum.value());
-
-    // Test the setValue method
-    floatAccum.setValue(5.0f);
-    Assert.assertEquals((Float) 5.0f, floatAccum.value());
-  }
-
-  @Test
   public void keyBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
     List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
----------------------------------------------------------------------
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
index cf5607f..338ca54 100644
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
+++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
@@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -362,33 +361,6 @@ public class Java8APISuite extends LocalJavaStreamingContext implements
Serializ
   }
 
   @Test
-  public void testForeachRDD() {
-    final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
-    final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
-    List<List<Integer>> inputData = Arrays.asList(
-        Arrays.asList(1,1,1),
-        Arrays.asList(1,1,1));
-
-    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData,
1);
-    JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
-
-    stream.foreachRDD(rdd -> {
-      accumRdd.add(1);
-      rdd.foreach(x -> accumEle.add(1));
-    });
-
-    // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
-    stream.foreachRDD((rdd, time) -> {
-      return;
-    });
-
-    JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(2, accumRdd.value().intValue());
-    Assert.assertEquals(6, accumEle.value().intValue());
-  }
-
-  @Test
   public void testPairFlatMap() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants"),

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 37577ac..a711811 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -32,7 +32,6 @@ import com.google.common.base.Objects;
 import org.junit.*;
 import org.junit.rules.ExpectedException;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.sql.*;
@@ -40,6 +39,7 @@ import org.apache.spark.sql.catalyst.encoders.OuterScopes;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.test.TestSparkSession;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.LongAccumulator;
 import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.expr;
 import static org.apache.spark.sql.types.DataTypes.*;
@@ -157,7 +157,7 @@ public class JavaDatasetSuite implements Serializable {
 
   @Test
   public void testForeach() {
-    final Accumulator<Integer> accum = jsc.accumulator(0);
+    final LongAccumulator accum = jsc.sc().longAccumulator();
     List<String> data = Arrays.asList("a", "b", "c");
     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f51dfe61/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 01f0c4d..3d54abd 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -36,7 +36,6 @@ import org.junit.Test;
 import com.google.common.io.Files;
 import com.google.common.collect.Sets;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -46,6 +45,7 @@ import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.spark.util.Utils;
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
@@ -794,8 +794,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements
Serializa
   @SuppressWarnings("unchecked")
   @Test
   public void testForeachRDD() {
-    final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
-    final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
+    final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator();
+    final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator();
     List<List<Integer>> inputData = Arrays.asList(
         Arrays.asList(1,1,1),
         Arrays.asList(1,1,1));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message