spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject [1/2] spark git commit: [SPARK-10547] [TEST] Streamline / improve style of Java API tests
Date Sat, 12 Sep 2015 09:40:15 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8285e3b0d -> 22730ad54


http://git-wip-us.apache.org/repos/asf/spark/blob/22730ad5/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index e0718f7..c521714 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -18,24 +18,22 @@
 package org.apache.spark.streaming;
 
 import java.io.*;
-import java.lang.Iterable;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import scala.Tuple2;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
-import scala.Tuple2;
-
 import org.junit.Assert;
-import static org.junit.Assert.*;
 import org.junit.Test;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.google.common.collect.Sets;
 
@@ -54,14 +52,14 @@ import org.apache.spark.SparkConf;
 // see http://stackoverflow.com/questions/758570/.
 public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
 
-  public void equalIterator(Iterator<?> a, Iterator<?> b) {
+  public static void equalIterator(Iterator<?> a, Iterator<?> b) {
     while (a.hasNext() && b.hasNext()) {
       Assert.assertEquals(a.next(), b.next());
     }
     Assert.assertEquals(a.hasNext(), b.hasNext());
   }
 
-  public void equalIterable(Iterable<?> a, Iterable<?> b) {
+  public static void equalIterable(Iterable<?> a, Iterable<?> b) {
       equalIterator(a.iterator(), b.iterator());
   }
 
@@ -74,14 +72,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   @Test
   public void testContextState() {
     List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
-    Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+    Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaTestUtils.attachTestOutputStream(stream);
-    Assert.assertTrue(ssc.getState() == StreamingContextState.INITIALIZED);
+    Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
     ssc.start();
-    Assert.assertTrue(ssc.getState() == StreamingContextState.ACTIVE);
+    Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
     ssc.stop();
-    Assert.assertTrue(ssc.getState() == StreamingContextState.STOPPED);
+    Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
   }
 
   @SuppressWarnings("unchecked")
@@ -118,7 +116,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
         @Override
-        public Integer call(String s) throws Exception {
+        public Integer call(String s) {
           return s.length();
         }
     });
@@ -180,7 +178,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testFilter() {
     List<List<String>> inputData = Arrays.asList(
         Arrays.asList("giants", "dodgers"),
-        Arrays.asList("yankees", "red socks"));
+        Arrays.asList("yankees", "red sox"));
 
     List<List<String>> expected = Arrays.asList(
         Arrays.asList("giants"),
@@ -189,7 +187,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
       @Override
-      public Boolean call(String s) throws Exception {
+      public Boolean call(String s) {
         return s.contains("a");
       }
     });
@@ -243,11 +241,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testGlom() {
     List<List<String>> inputData = Arrays.asList(
         Arrays.asList("giants", "dodgers"),
-        Arrays.asList("yankees", "red socks"));
+        Arrays.asList("yankees", "red sox"));
 
     List<List<List<String>>> expected = Arrays.asList(
         Arrays.asList(Arrays.asList("giants", "dodgers")),
-        Arrays.asList(Arrays.asList("yankees", "red socks")));
+        Arrays.asList(Arrays.asList("yankees", "red sox")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<List<String>> glommed = stream.glom();
@@ -262,22 +260,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testMapPartitions() {
     List<List<String>> inputData = Arrays.asList(
         Arrays.asList("giants", "dodgers"),
-        Arrays.asList("yankees", "red socks"));
+        Arrays.asList("yankees", "red sox"));
 
     List<List<String>> expected = Arrays.asList(
         Arrays.asList("GIANTSDODGERS"),
-        Arrays.asList("YANKEESRED SOCKS"));
+        Arrays.asList("YANKEESRED SOX"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<String> mapped = stream.mapPartitions(
         new FlatMapFunction<Iterator<String>, String>() {
           @Override
           public Iterable<String> call(Iterator<String> in) {
-            String out = "";
+            StringBuilder out = new StringBuilder();
             while (in.hasNext()) {
-              out = out + in.next().toUpperCase();
+              out.append(in.next().toUpperCase(Locale.ENGLISH));
             }
-            return Lists.newArrayList(out);
+            return Arrays.asList(out.toString());
           }
         });
     JavaTestUtils.attachTestOutputStream(mapped);
@@ -286,16 +284,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
-  private class IntegerSum implements Function2<Integer, Integer, Integer> {
+  private static class IntegerSum implements Function2<Integer, Integer, Integer> {
     @Override
-    public Integer call(Integer i1, Integer i2) throws Exception {
+    public Integer call(Integer i1, Integer i2) {
       return i1 + i2;
     }
   }
 
-  private class IntegerDifference implements Function2<Integer, Integer, Integer> {
+  private static class IntegerDifference implements Function2<Integer, Integer, Integer> {
     @Override
-    public Integer call(Integer i1, Integer i2) throws Exception {
+    public Integer call(Integer i1, Integer i2) {
       return i1 - i2;
     }
   }
@@ -347,13 +345,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         Arrays.asList(24));
 
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> reducedWindowed = null;
+    JavaDStream<Integer> reducedWindowed;
     if (withInverse) {
       reducedWindowed = stream.reduceByWindow(new IntegerSum(),
-        new IntegerDifference(), new Duration(2000), new Duration(1000));
+                                              new IntegerDifference(), new Duration(2000), new Duration(1000));
     } else {
       reducedWindowed = stream.reduceByWindow(new IntegerSum(),
-        new Duration(2000), new Duration(1000));
+                                              new Duration(2000), new Duration(1000));
     }
     JavaTestUtils.attachTestOutputStream(reducedWindowed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -378,11 +376,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         Arrays.asList(7,8,9));
 
     JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
-    JavaRDD<Integer> rdd1 = ssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));
-    JavaRDD<Integer> rdd2 = ssc.sparkContext().parallelize(Arrays.asList(4, 5, 6));
-    JavaRDD<Integer> rdd3 = ssc.sparkContext().parallelize(Arrays.asList(7,8,9));
+    JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
+    JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
+    JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
 
-    LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
+    Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
     rdds.add(rdd1);
     rdds.add(rdd2);
     rdds.add(rdd3);
@@ -410,10 +408,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Integer> transformed = stream.transform(
       new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
         @Override
-        public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+        public JavaRDD<Integer> call(JavaRDD<Integer> in) {
           return in.map(new Function<Integer, Integer>() {
             @Override
-            public Integer call(Integer i) throws Exception {
+            public Integer call(Integer i) {
               return i + 2;
             }
           });
@@ -435,70 +433,70 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
 
     List<List<Tuple2<String, Integer>>> pairInputData =
-        Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+        Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
         JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
 
-    JavaDStream<Integer> transformed1 = stream.transform(
+    stream.transform(
         new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
           @Override
-          public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+          public JavaRDD<Integer> call(JavaRDD<Integer> in) {
             return null;
           }
         }
     );
 
-    JavaDStream<Integer> transformed2 = stream.transform(
+    stream.transform(
       new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
-        @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+        @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) {
           return null;
         }
       }
     );
 
-    JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
+    stream.transformToPair(
         new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
-          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
+    stream.transformToPair(
         new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
-          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) {
             return null;
           }
         }
     );
 
-    JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+    pairStream.transform(
         new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
-          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) {
             return null;
           }
         }
     );
 
-    JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+    pairStream.transform(
         new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
-          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
+    pairStream.transformToPair(
         new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
-          @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+          @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
+    pairStream.transformToPair(
         new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
-          @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+          @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) {
             return null;
           }
         }
@@ -511,32 +509,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testTransformWith() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, String>("california", "dodgers"),
-            new Tuple2<String, String>("new york", "yankees")),
+            new Tuple2<>("california", "dodgers"),
+            new Tuple2<>("new york", "yankees")),
         Arrays.asList(
-            new Tuple2<String, String>("california", "sharks"),
-            new Tuple2<String, String>("new york", "rangers")));
+            new Tuple2<>("california", "sharks"),
+            new Tuple2<>("new york", "rangers")));
 
     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, String>("california", "giants"),
-            new Tuple2<String, String>("new york", "mets")),
+            new Tuple2<>("california", "giants"),
+            new Tuple2<>("new york", "mets")),
         Arrays.asList(
-            new Tuple2<String, String>("california", "ducks"),
-            new Tuple2<String, String>("new york", "islanders")));
+            new Tuple2<>("california", "ducks"),
+            new Tuple2<>("new york", "islanders")));
 
 
     List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
         Sets.newHashSet(
-            new Tuple2<String, Tuple2<String, String>>("california",
-                new Tuple2<String, String>("dodgers", "giants")),
-            new Tuple2<String, Tuple2<String, String>>("new york",
-                new Tuple2<String, String>("yankees", "mets"))),
+            new Tuple2<>("california",
+                         new Tuple2<>("dodgers", "giants")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("yankees", "mets"))),
         Sets.newHashSet(
-            new Tuple2<String, Tuple2<String, String>>("california",
-                new Tuple2<String, String>("sharks", "ducks")),
-            new Tuple2<String, Tuple2<String, String>>("new york",
-                new Tuple2<String, String>("rangers", "islanders"))));
+            new Tuple2<>("california",
+                         new Tuple2<>("sharks", "ducks")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("rangers", "islanders"))));
 
     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
         ssc, stringStringKVStream1, 1);
@@ -552,14 +550,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
             JavaPairRDD<String, String>,
             JavaPairRDD<String, String>,
             Time,
-            JavaPairRDD<String, Tuple2<String, String>>
-          >() {
+            JavaPairRDD<String, Tuple2<String, String>>>() {
           @Override
           public JavaPairRDD<String, Tuple2<String, String>> call(
               JavaPairRDD<String, String> rdd1,
               JavaPairRDD<String, String> rdd2,
-              Time time
-          ) throws Exception {
+              Time time) {
             return rdd1.join(rdd2);
           }
         }
@@ -567,9 +563,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     JavaTestUtils.attachTestOutputStream(joined);
     List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
     for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
-        unorderedResult.add(Sets.newHashSet(res));
+      unorderedResult.add(Sets.newHashSet(res));
     }
 
     Assert.assertEquals(expected, unorderedResult);
@@ -587,89 +583,89 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
 
     List<List<Tuple2<String, Integer>>> pairInputData1 =
-        Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+        Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
     List<List<Tuple2<Double, Character>>> pairInputData2 =
-        Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+        Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
     JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
         JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
     JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
         JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
 
-    JavaDStream<Double> transformed1 = stream1.transformWith(
+    stream1.transformWith(
         stream2,
         new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaDStream<Double> transformed2 = stream1.transformWith(
+    stream1.transformWith(
         pairStream1,
         new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
+    stream1.transformWithToPair(
         stream2,
         new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
+    stream1.transformWithToPair(
         pairStream1,
         new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+    pairStream1.transformWith(
         stream2,
         new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+    pairStream1.transformWith(
         pairStream1,
         new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
+    pairStream1.transformWithToPair(
         stream2,
         new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) {
             return null;
           }
         }
     );
 
-    JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
+    pairStream1.transformWithToPair(
         pairStream2,
         new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) {
             return null;
           }
         }
@@ -690,13 +686,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     );
 
     List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
-        Arrays.asList(new Tuple2<Integer, String>(1, "x")),
-        Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+        Arrays.asList(new Tuple2<>(1, "x")),
+        Arrays.asList(new Tuple2<>(2, "y"))
     );
 
     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
-        Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+        Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+        Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
     );
 
     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
@@ -707,7 +703,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
 
     // This is just to test whether this transform to JavaStream compiles
-    JavaDStream<Long> transformed1 = ssc.transform(
+    ssc.transform(
       listOfDStreams1,
       new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
         @Override
@@ -733,8 +729,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
           JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
           PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() {
             @Override
-            public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-              return new Tuple2<Integer, Integer>(i, i);
+            public Tuple2<Integer, Integer> call(Integer i) {
+              return new Tuple2<>(i, i);
             }
           };
           return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
@@ -763,7 +759,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
       @Override
       public Iterable<String> call(String x) {
-        return Lists.newArrayList(x.split("(?!^)"));
+        return Arrays.asList(x.split("(?!^)"));
       }
     });
     JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -782,39 +778,39 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<Integer, String>(6, "g"),
-            new Tuple2<Integer, String>(6, "i"),
-            new Tuple2<Integer, String>(6, "a"),
-            new Tuple2<Integer, String>(6, "n"),
-            new Tuple2<Integer, String>(6, "t"),
-            new Tuple2<Integer, String>(6, "s")),
+            new Tuple2<>(6, "g"),
+            new Tuple2<>(6, "i"),
+            new Tuple2<>(6, "a"),
+            new Tuple2<>(6, "n"),
+            new Tuple2<>(6, "t"),
+            new Tuple2<>(6, "s")),
         Arrays.asList(
-            new Tuple2<Integer, String>(7, "d"),
-            new Tuple2<Integer, String>(7, "o"),
-            new Tuple2<Integer, String>(7, "d"),
-            new Tuple2<Integer, String>(7, "g"),
-            new Tuple2<Integer, String>(7, "e"),
-            new Tuple2<Integer, String>(7, "r"),
-            new Tuple2<Integer, String>(7, "s")),
+            new Tuple2<>(7, "d"),
+            new Tuple2<>(7, "o"),
+            new Tuple2<>(7, "d"),
+            new Tuple2<>(7, "g"),
+            new Tuple2<>(7, "e"),
+            new Tuple2<>(7, "r"),
+            new Tuple2<>(7, "s")),
         Arrays.asList(
-            new Tuple2<Integer, String>(9, "a"),
-            new Tuple2<Integer, String>(9, "t"),
-            new Tuple2<Integer, String>(9, "h"),
-            new Tuple2<Integer, String>(9, "l"),
-            new Tuple2<Integer, String>(9, "e"),
-            new Tuple2<Integer, String>(9, "t"),
-            new Tuple2<Integer, String>(9, "i"),
-            new Tuple2<Integer, String>(9, "c"),
-            new Tuple2<Integer, String>(9, "s")));
+            new Tuple2<>(9, "a"),
+            new Tuple2<>(9, "t"),
+            new Tuple2<>(9, "h"),
+            new Tuple2<>(9, "l"),
+            new Tuple2<>(9, "e"),
+            new Tuple2<>(9, "t"),
+            new Tuple2<>(9, "i"),
+            new Tuple2<>(9, "c"),
+            new Tuple2<>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
       new PairFlatMapFunction<String, Integer, String>() {
         @Override
-        public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
-          List<Tuple2<Integer, String>> out = Lists.newArrayList();
+        public Iterable<Tuple2<Integer, String>> call(String in) {
+          List<Tuple2<Integer, String>> out = new ArrayList<>();
           for (String letter: in.split("(?!^)")) {
-            out.add(new Tuple2<Integer, String>(in.length(), letter));
+            out.add(new Tuple2<>(in.length(), letter));
           }
           return out;
         }
@@ -859,13 +855,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
    */
   public static <T> void assertOrderInvariantEquals(
       List<List<T>> expected, List<List<T>> actual) {
-    List<Set<T>> expectedSets = new ArrayList<Set<T>>();
+    List<Set<T>> expectedSets = new ArrayList<>();
     for (List<T> list: expected) {
-      expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
+      expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
     }
-    List<Set<T>> actualSets = new ArrayList<Set<T>>();
+    List<Set<T>> actualSets = new ArrayList<>();
     for (List<T> list: actual) {
-      actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
+      actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
     }
     Assert.assertEquals(expectedSets, actualSets);
   }
@@ -877,25 +873,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testPairFilter() {
     List<List<String>> inputData = Arrays.asList(
         Arrays.asList("giants", "dodgers"),
-        Arrays.asList("yankees", "red socks"));
+        Arrays.asList("yankees", "red sox"));
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
-        Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+        Arrays.asList(new Tuple2<>("giants", 6)),
+        Arrays.asList(new Tuple2<>("yankees", 7)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
         new PairFunction<String, String, Integer>() {
           @Override
-          public Tuple2<String, Integer> call(String in) throws Exception {
-            return new Tuple2<String, Integer>(in, in.length());
+          public Tuple2<String, Integer> call(String in) {
+            return new Tuple2<>(in, in.length());
           }
         });
 
     JavaPairDStream<String, Integer> filtered = pairStream.filter(
         new Function<Tuple2<String, Integer>, Boolean>() {
       @Override
-      public Boolean call(Tuple2<String, Integer> in) throws Exception {
+      public Boolean call(Tuple2<String, Integer> in) {
         return in._1().contains("a");
       }
     });
@@ -906,28 +902,28 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   }
 
   @SuppressWarnings("unchecked")
-  private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
-      Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
-          new Tuple2<String, String>("california", "giants"),
-          new Tuple2<String, String>("new york", "yankees"),
-          new Tuple2<String, String>("new york", "mets")),
-      Arrays.asList(new Tuple2<String, String>("california", "sharks"),
-          new Tuple2<String, String>("california", "ducks"),
-          new Tuple2<String, String>("new york", "rangers"),
-          new Tuple2<String, String>("new york", "islanders")));
+  private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+      Arrays.asList(new Tuple2<>("california", "dodgers"),
+                    new Tuple2<>("california", "giants"),
+                    new Tuple2<>("new york", "yankees"),
+                    new Tuple2<>("new york", "mets")),
+      Arrays.asList(new Tuple2<>("california", "sharks"),
+                    new Tuple2<>("california", "ducks"),
+                    new Tuple2<>("new york", "rangers"),
+                    new Tuple2<>("new york", "islanders")));
 
   @SuppressWarnings("unchecked")
-  private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+  private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
       Arrays.asList(
-          new Tuple2<String, Integer>("california", 1),
-          new Tuple2<String, Integer>("california", 3),
-          new Tuple2<String, Integer>("new york", 4),
-          new Tuple2<String, Integer>("new york", 1)),
+          new Tuple2<>("california", 1),
+          new Tuple2<>("california", 3),
+          new Tuple2<>("new york", 4),
+          new Tuple2<>("new york", 1)),
       Arrays.asList(
-          new Tuple2<String, Integer>("california", 5),
-          new Tuple2<String, Integer>("california", 5),
-          new Tuple2<String, Integer>("new york", 3),
-          new Tuple2<String, Integer>("new york", 1)));
+          new Tuple2<>("california", 5),
+          new Tuple2<>("california", 5),
+          new Tuple2<>("new york", 3),
+          new Tuple2<>("new york", 1)));
 
   @SuppressWarnings("unchecked")
   @Test
@@ -936,22 +932,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
         Arrays.asList(
-                new Tuple2<Integer, String>(1, "california"),
-                new Tuple2<Integer, String>(3, "california"),
-                new Tuple2<Integer, String>(4, "new york"),
-                new Tuple2<Integer, String>(1, "new york")),
+            new Tuple2<>(1, "california"),
+            new Tuple2<>(3, "california"),
+            new Tuple2<>(4, "new york"),
+            new Tuple2<>(1, "new york")),
         Arrays.asList(
-                new Tuple2<Integer, String>(5, "california"),
-                new Tuple2<Integer, String>(5, "california"),
-                new Tuple2<Integer, String>(3, "new york"),
-                new Tuple2<Integer, String>(1, "new york")));
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(3, "new york"),
+            new Tuple2<>(1, "new york")));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
         new PairFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
-          public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+          public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
             return in.swap();
           }
         });
@@ -969,23 +965,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<Integer, String>(1, "california"),
-            new Tuple2<Integer, String>(3, "california"),
-            new Tuple2<Integer, String>(4, "new york"),
-            new Tuple2<Integer, String>(1, "new york")),
+            new Tuple2<>(1, "california"),
+            new Tuple2<>(3, "california"),
+            new Tuple2<>(4, "new york"),
+            new Tuple2<>(1, "new york")),
         Arrays.asList(
-            new Tuple2<Integer, String>(5, "california"),
-            new Tuple2<Integer, String>(5, "california"),
-            new Tuple2<Integer, String>(3, "new york"),
-            new Tuple2<Integer, String>(1, "new york")));
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(3, "new york"),
+            new Tuple2<>(1, "new york")));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
         new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
           @Override
-          public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
-            LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+          public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+            List<Tuple2<Integer, String>> out = new LinkedList<>();
             while (in.hasNext()) {
               Tuple2<String, Integer> next = in.next();
               out.add(next.swap());
@@ -1014,7 +1010,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Integer> reversed = pairStream.map(
         new Function<Tuple2<String, Integer>, Integer>() {
           @Override
-          public Integer call(Tuple2<String, Integer> in) throws Exception {
+          public Integer call(Tuple2<String, Integer> in) {
             return in._2();
           }
         });
@@ -1030,23 +1026,23 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
     List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, Integer>("hi", 1),
-            new Tuple2<String, Integer>("ho", 2)),
+            new Tuple2<>("hi", 1),
+            new Tuple2<>("ho", 2)),
         Arrays.asList(
-            new Tuple2<String, Integer>("hi", 1),
-            new Tuple2<String, Integer>("ho", 2)));
+            new Tuple2<>("hi", 1),
+            new Tuple2<>("ho", 2)));
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<Integer, String>(1, "h"),
-            new Tuple2<Integer, String>(1, "i"),
-            new Tuple2<Integer, String>(2, "h"),
-            new Tuple2<Integer, String>(2, "o")),
+            new Tuple2<>(1, "h"),
+            new Tuple2<>(1, "i"),
+            new Tuple2<>(2, "h"),
+            new Tuple2<>(2, "o")),
         Arrays.asList(
-            new Tuple2<Integer, String>(1, "h"),
-            new Tuple2<Integer, String>(1, "i"),
-            new Tuple2<Integer, String>(2, "h"),
-            new Tuple2<Integer, String>(2, "o")));
+            new Tuple2<>(1, "h"),
+            new Tuple2<>(1, "i"),
+            new Tuple2<>(2, "h"),
+            new Tuple2<>(2, "o")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
         JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -1054,10 +1050,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
         new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
-          public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
-            List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+          public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+            List<Tuple2<Integer, String>> out = new LinkedList<>();
             for (Character s : in._1().toCharArray()) {
-              out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+              out.add(new Tuple2<>(in._2(), s.toString()));
             }
             return out;
           }
@@ -1075,11 +1071,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
-            new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+            new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
+            new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
         Arrays.asList(
-            new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
-            new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+            new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
+            new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1111,11 +1107,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, Integer>("california", 4),
-            new Tuple2<String, Integer>("new york", 5)),
+            new Tuple2<>("california", 4),
+            new Tuple2<>("new york", 5)),
         Arrays.asList(
-            new Tuple2<String, Integer>("california", 10),
-            new Tuple2<String, Integer>("new york", 4)));
+            new Tuple2<>("california", 10),
+            new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
         ssc, inputData, 1);
@@ -1136,20 +1132,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, Integer>("california", 4),
-            new Tuple2<String, Integer>("new york", 5)),
+            new Tuple2<>("california", 4),
+            new Tuple2<>("new york", 5)),
         Arrays.asList(
-            new Tuple2<String, Integer>("california", 10),
-            new Tuple2<String, Integer>("new york", 4)));
+            new Tuple2<>("california", 10),
+            new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
         ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
+    JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
         new Function<Integer, Integer>() {
           @Override
-          public Integer call(Integer i) throws Exception {
+          public Integer call(Integer i) {
             return i;
           }
         }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
@@ -1170,13 +1166,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<String, Long>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, Long>("hello", 1L),
-            new Tuple2<String, Long>("world", 1L)),
+            new Tuple2<>("hello", 1L),
+            new Tuple2<>("world", 1L)),
         Arrays.asList(
-            new Tuple2<String, Long>("hello", 1L),
-            new Tuple2<String, Long>("moon", 1L)),
+            new Tuple2<>("hello", 1L),
+            new Tuple2<>("moon", 1L)),
         Arrays.asList(
-            new Tuple2<String, Long>("hello", 1L)));
+            new Tuple2<>("hello", 1L)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Long> counted = stream.countByValue();
@@ -1193,16 +1189,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
-        new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+        new Tuple2<>("california", Arrays.asList(1, 3)),
+        new Tuple2<>("new york", Arrays.asList(1, 4))
       ),
       Arrays.asList(
-        new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
-        new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+        new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
+        new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
       ),
       Arrays.asList(
-        new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
-        new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+        new Tuple2<>("california", Arrays.asList(5, 5)),
+        new Tuple2<>("new york", Arrays.asList(1, 3))
       )
     );
 
@@ -1220,16 +1216,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     }
   }
 
-  private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
-    List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+  private static Set<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+    List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
     for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
       newListOfTuples.add(convert(tuple));
     }
-    return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+    return new HashSet<>(newListOfTuples);
   }
 
-  private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
-    return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
+  private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+    return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
   }
 
   @SuppressWarnings("unchecked")
@@ -1238,12 +1234,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, Integer>("california", 4),
-            new Tuple2<String, Integer>("new york", 5)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 14),
-            new Tuple2<String, Integer>("new york", 9)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 10),
-            new Tuple2<String, Integer>("new york", 4)));
+        Arrays.asList(new Tuple2<>("california", 4),
+                      new Tuple2<>("new york", 5)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)),
+        Arrays.asList(new Tuple2<>("california", 10),
+                      new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1262,12 +1258,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, Integer>("california", 4),
-            new Tuple2<String, Integer>("new york", 5)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 14),
-            new Tuple2<String, Integer>("new york", 9)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 14),
-            new Tuple2<String, Integer>("new york", 9)));
+        Arrays.asList(new Tuple2<>("california", 4),
+                      new Tuple2<>("new york", 5)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1278,10 +1274,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
           public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
             int out = 0;
             if (state.isPresent()) {
-              out = out + state.get();
+              out += state.get();
             }
             for (Integer v : values) {
-              out = out + v;
+              out += v;
             }
             return Optional.of(out);
           }
@@ -1298,19 +1294,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<Tuple2<String, Integer>> initial = Arrays.asList (
-            new Tuple2<String, Integer> ("california", 1),
-            new Tuple2<String, Integer> ("new york", 2));
+        new Tuple2<>("california", 1),
+            new Tuple2<>("new york", 2));
 
     JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
     JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD);
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, Integer>("california", 5),
-            new Tuple2<String, Integer>("new york", 7)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 15),
-            new Tuple2<String, Integer>("new york", 11)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 15),
-            new Tuple2<String, Integer>("new york", 11)));
+        Arrays.asList(new Tuple2<>("california", 5),
+                      new Tuple2<>("new york", 7)),
+        Arrays.asList(new Tuple2<>("california", 15),
+                      new Tuple2<>("new york", 11)),
+        Arrays.asList(new Tuple2<>("california", 15),
+                      new Tuple2<>("new york", 11)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -1321,10 +1317,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
           public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
             int out = 0;
             if (state.isPresent()) {
-              out = out + state.get();
+              out += state.get();
             }
             for (Integer v : values) {
-              out = out + v;
+              out += v;
             }
             return Optional.of(out);
           }
@@ -1341,19 +1337,19 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, Integer>("california", 4),
-            new Tuple2<String, Integer>("new york", 5)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 14),
-            new Tuple2<String, Integer>("new york", 9)),
-        Arrays.asList(new Tuple2<String, Integer>("california", 10),
-            new Tuple2<String, Integer>("new york", 4)));
+        Arrays.asList(new Tuple2<>("california", 4),
+                      new Tuple2<>("new york", 5)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)),
+        Arrays.asList(new Tuple2<>("california", 10),
+                      new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> reduceWindowed =
         pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
-            new Duration(2000), new Duration(1000));
+                                        new Duration(2000), new Duration(1000));
     JavaTestUtils.attachTestOutputStream(reduceWindowed);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -1370,15 +1366,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
         Sets.newHashSet(
-            new Tuple2<String, Long>("hello", 1L),
-            new Tuple2<String, Long>("world", 1L)),
+            new Tuple2<>("hello", 1L),
+            new Tuple2<>("world", 1L)),
         Sets.newHashSet(
-            new Tuple2<String, Long>("hello", 2L),
-            new Tuple2<String, Long>("world", 1L),
-            new Tuple2<String, Long>("moon", 1L)),
+            new Tuple2<>("hello", 2L),
+            new Tuple2<>("world", 1L),
+            new Tuple2<>("moon", 1L)),
         Sets.newHashSet(
-            new Tuple2<String, Long>("hello", 2L),
-            new Tuple2<String, Long>("moon", 1L)));
+            new Tuple2<>("hello", 2L),
+            new Tuple2<>("moon", 1L)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
         ssc, inputData, 1);
@@ -1386,7 +1382,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
       stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
     JavaTestUtils.attachTestOutputStream(counted);
     List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-    List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList();
+    List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
     for (List<Tuple2<String, Long>> res: result) {
       unorderedResult.add(Sets.newHashSet(res));
     }
@@ -1399,27 +1395,27 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testPairTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
         Arrays.asList(
-            new Tuple2<Integer, Integer>(3, 5),
-            new Tuple2<Integer, Integer>(1, 5),
-            new Tuple2<Integer, Integer>(4, 5),
-            new Tuple2<Integer, Integer>(2, 5)),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(1, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(2, 5)),
         Arrays.asList(
-            new Tuple2<Integer, Integer>(2, 5),
-            new Tuple2<Integer, Integer>(3, 5),
-            new Tuple2<Integer, Integer>(4, 5),
-            new Tuple2<Integer, Integer>(1, 5)));
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(1, 5)));
 
     List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<Integer, Integer>(1, 5),
-            new Tuple2<Integer, Integer>(2, 5),
-            new Tuple2<Integer, Integer>(3, 5),
-            new Tuple2<Integer, Integer>(4, 5)),
+            new Tuple2<>(1, 5),
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5)),
         Arrays.asList(
-            new Tuple2<Integer, Integer>(1, 5),
-            new Tuple2<Integer, Integer>(2, 5),
-            new Tuple2<Integer, Integer>(3, 5),
-            new Tuple2<Integer, Integer>(4, 5)));
+            new Tuple2<>(1, 5),
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5)));
 
     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
         ssc, inputData, 1);
@@ -1428,7 +1424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
         new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
           @Override
-          public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+          public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) {
             return in.sortByKey();
           }
         });
@@ -1444,15 +1440,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   public void testPairToNormalRDDTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
         Arrays.asList(
-            new Tuple2<Integer, Integer>(3, 5),
-            new Tuple2<Integer, Integer>(1, 5),
-            new Tuple2<Integer, Integer>(4, 5),
-            new Tuple2<Integer, Integer>(2, 5)),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(1, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(2, 5)),
         Arrays.asList(
-            new Tuple2<Integer, Integer>(2, 5),
-            new Tuple2<Integer, Integer>(3, 5),
-            new Tuple2<Integer, Integer>(4, 5),
-            new Tuple2<Integer, Integer>(1, 5)));
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(1, 5)));
 
     List<List<Integer>> expected = Arrays.asList(
         Arrays.asList(3,1,4,2),
@@ -1465,11 +1461,11 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Integer> firstParts = pairStream.transform(
         new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
           @Override
-          public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+          public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
             return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
               @Override
-              public Integer call(Tuple2<Integer, Integer> in) {
-                return in._1();
+              public Integer call(Tuple2<Integer, Integer> in2) {
+                return in2._1();
               }
             });
           }
@@ -1487,14 +1483,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
     List<List<Tuple2<String, String>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
-            new Tuple2<String, String>("california", "GIANTS"),
-            new Tuple2<String, String>("new york", "YANKEES"),
-            new Tuple2<String, String>("new york", "METS")),
-        Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
-            new Tuple2<String, String>("california", "DUCKS"),
-            new Tuple2<String, String>("new york", "RANGERS"),
-            new Tuple2<String, String>("new york", "ISLANDERS")));
+        Arrays.asList(new Tuple2<>("california", "DODGERS"),
+                      new Tuple2<>("california", "GIANTS"),
+                      new Tuple2<>("new york", "YANKEES"),
+                      new Tuple2<>("new york", "METS")),
+        Arrays.asList(new Tuple2<>("california", "SHARKS"),
+                      new Tuple2<>("california", "DUCKS"),
+                      new Tuple2<>("new york", "RANGERS"),
+                      new Tuple2<>("new york", "ISLANDERS")));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
         ssc, inputData, 1);
@@ -1502,8 +1498,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
       @Override
-      public String call(String s) throws Exception {
-        return s.toUpperCase();
+      public String call(String s) {
+        return s.toUpperCase(Locale.ENGLISH);
       }
     });
 
@@ -1519,22 +1515,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
     List<List<Tuple2<String, String>>> expected = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
-            new Tuple2<String, String>("california", "dodgers2"),
-            new Tuple2<String, String>("california", "giants1"),
-            new Tuple2<String, String>("california", "giants2"),
-            new Tuple2<String, String>("new york", "yankees1"),
-            new Tuple2<String, String>("new york", "yankees2"),
-            new Tuple2<String, String>("new york", "mets1"),
-            new Tuple2<String, String>("new york", "mets2")),
-        Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
-            new Tuple2<String, String>("california", "sharks2"),
-            new Tuple2<String, String>("california", "ducks1"),
-            new Tuple2<String, String>("california", "ducks2"),
-            new Tuple2<String, String>("new york", "rangers1"),
-            new Tuple2<String, String>("new york", "rangers2"),
-            new Tuple2<String, String>("new york", "islanders1"),
-            new Tuple2<String, String>("new york", "islanders2")));
+        Arrays.asList(new Tuple2<>("california", "dodgers1"),
+                      new Tuple2<>("california", "dodgers2"),
+                      new Tuple2<>("california", "giants1"),
+                      new Tuple2<>("california", "giants2"),
+                      new Tuple2<>("new york", "yankees1"),
+                      new Tuple2<>("new york", "yankees2"),
+                      new Tuple2<>("new york", "mets1"),
+                      new Tuple2<>("new york", "mets2")),
+        Arrays.asList(new Tuple2<>("california", "sharks1"),
+                      new Tuple2<>("california", "sharks2"),
+                      new Tuple2<>("california", "ducks1"),
+                      new Tuple2<>("california", "ducks2"),
+                      new Tuple2<>("new york", "rangers1"),
+                      new Tuple2<>("new york", "rangers2"),
+                      new Tuple2<>("new york", "islanders1"),
+                      new Tuple2<>("new york", "islanders2")));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
         ssc, inputData, 1);
@@ -1545,7 +1541,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         new Function<String, Iterable<String>>() {
           @Override
           public Iterable<String> call(String in) {
-            List<String> out = new ArrayList<String>();
+            List<String> out = new ArrayList<>();
             out.add(in + "1");
             out.add(in + "2");
             return out;
@@ -1562,29 +1558,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   @Test
   public void testCoGroup() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
-            new Tuple2<String, String>("new york", "yankees")),
-        Arrays.asList(new Tuple2<String, String>("california", "sharks"),
-            new Tuple2<String, String>("new york", "rangers")));
+        Arrays.asList(new Tuple2<>("california", "dodgers"),
+                      new Tuple2<>("new york", "yankees")),
+        Arrays.asList(new Tuple2<>("california", "sharks"),
+                      new Tuple2<>("new york", "rangers")));
 
     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "giants"),
-            new Tuple2<String, String>("new york", "mets")),
-        Arrays.asList(new Tuple2<String, String>("california", "ducks"),
-            new Tuple2<String, String>("new york", "islanders")));
+        Arrays.asList(new Tuple2<>("california", "giants"),
+                      new Tuple2<>("new york", "mets")),
+        Arrays.asList(new Tuple2<>("california", "ducks"),
+                      new Tuple2<>("new york", "islanders")));
 
 
     List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
-                new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
-            new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
-                new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
+            new Tuple2<>("california",
+                         new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
+            new Tuple2<>("new york",
+                         new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))),
         Arrays.asList(
-            new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
-                new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
-            new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
-                new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
+            new Tuple2<>("california",
+                         new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
+            new Tuple2<>("new york",
+                         new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
 
 
     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
@@ -1620,29 +1616,29 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   @Test
   public void testJoin() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
-            new Tuple2<String, String>("new york", "yankees")),
-        Arrays.asList(new Tuple2<String, String>("california", "sharks"),
-            new Tuple2<String, String>("new york", "rangers")));
+        Arrays.asList(new Tuple2<>("california", "dodgers"),
+                      new Tuple2<>("new york", "yankees")),
+        Arrays.asList(new Tuple2<>("california", "sharks"),
+                      new Tuple2<>("new york", "rangers")));
 
     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "giants"),
-            new Tuple2<String, String>("new york", "mets")),
-        Arrays.asList(new Tuple2<String, String>("california", "ducks"),
-            new Tuple2<String, String>("new york", "islanders")));
+        Arrays.asList(new Tuple2<>("california", "giants"),
+                      new Tuple2<>("new york", "mets")),
+        Arrays.asList(new Tuple2<>("california", "ducks"),
+                      new Tuple2<>("new york", "islanders")));
 
 
     List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
         Arrays.asList(
-            new Tuple2<String, Tuple2<String, String>>("california",
-                new Tuple2<String, String>("dodgers", "giants")),
-            new Tuple2<String, Tuple2<String, String>>("new york",
-                new Tuple2<String, String>("yankees", "mets"))),
+            new Tuple2<>("california",
+                         new Tuple2<>("dodgers", "giants")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("yankees", "mets"))),
         Arrays.asList(
-            new Tuple2<String, Tuple2<String, String>>("california",
-                new Tuple2<String, String>("sharks", "ducks")),
-            new Tuple2<String, Tuple2<String, String>>("new york",
-                new Tuple2<String, String>("rangers", "islanders"))));
+            new Tuple2<>("california",
+                         new Tuple2<>("sharks", "ducks")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("rangers", "islanders"))));
 
 
     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
@@ -1664,13 +1660,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   @Test
   public void testLeftOuterJoin() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
-            new Tuple2<String, String>("new york", "yankees")),
-        Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+        Arrays.asList(new Tuple2<>("california", "dodgers"),
+                      new Tuple2<>("new york", "yankees")),
+        Arrays.asList(new Tuple2<>("california", "sharks") ));
 
     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
-        Arrays.asList(new Tuple2<String, String>("california", "giants") ),
-        Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+        Arrays.asList(new Tuple2<>("california", "giants") ),
+        Arrays.asList(new Tuple2<>("new york", "islanders") )
 
     );
 
@@ -1713,7 +1709,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
       @Override
-      public Integer call(String s) throws Exception {
+      public Integer call(String s) {
         return s.length();
       }
     });
@@ -1752,6 +1748,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     // (used to detect the new context)
     final AtomicBoolean newContextCreated = new AtomicBoolean(false);
     Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
+      @Override
       public JavaStreamingContext call() {
         newContextCreated.set(true);
         return new JavaStreamingContext(conf, Seconds.apply(1));
@@ -1765,20 +1762,20 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     newContextCreated.set(false);
     ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
-        new org.apache.hadoop.conf.Configuration(), true);
+        new Configuration(), true);
     Assert.assertTrue("new context not created", newContextCreated.get());
     ssc.stop();
 
     newContextCreated.set(false);
     ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
-        new org.apache.hadoop.conf.Configuration());
+        new Configuration());
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
 
     newContextCreated.set(false);
     JavaSparkContext sc = new JavaSparkContext(conf);
     ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
-        new org.apache.hadoop.conf.Configuration());
+        new Configuration());
     Assert.assertTrue("old context not recovered", !newContextCreated.get());
     ssc.stop();
   }
@@ -1800,7 +1797,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream letterCount = stream.map(new Function<String, Integer>() {
       @Override
-      public Integer call(String s) throws Exception {
+      public Integer call(String s) {
         return s.length();
       }
     });
@@ -1818,29 +1815,26 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
   // InputStream functionality is deferred to the existing Scala tests.
   @Test
   public void testSocketTextStream() {
-      JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
+    ssc.socketTextStream("localhost", 12345);
   }
 
   @Test
   public void testSocketString() {
-
-    class Converter implements Function<InputStream, Iterable<String>> {
-      public Iterable<String> call(InputStream in) throws IOException {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-        List<String> out = new ArrayList<String>();
-        while (true) {
-          String line = reader.readLine();
-          if (line == null) { break; }
-          out.add(line);
-        }
-        return out;
-      }
-    }
-
-    JavaDStream<String> test = ssc.socketStream(
+    ssc.socketStream(
       "localhost",
       12345,
-      new Converter(),
+      new Function<InputStream, Iterable<String>>() {
+        @Override
+        public Iterable<String> call(InputStream in) throws IOException {
+          List<String> out = new ArrayList<>();
+          try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+            for (String line; (line = reader.readLine()) != null;) {
+              out.add(line);
+            }
+          }
+          return out;
+        }
+      },
       StorageLevel.MEMORY_ONLY());
   }
 
@@ -1870,7 +1864,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
       TextInputFormat.class,
       new Function<Path, Boolean>() {
         @Override
-        public Boolean call(Path v1) throws Exception {
+        public Boolean call(Path v1) {
           return Boolean.TRUE;
         }
       },
@@ -1879,7 +1873,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<String> test = inputStream.map(
       new Function<Tuple2<LongWritable, Text>, String>() {
         @Override
-        public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+        public String call(Tuple2<LongWritable, Text> v1) {
           return v1._2().toString();
         }
     });
@@ -1892,19 +1886,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
   @Test
   public void testRawSocketStream() {
-    JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+    ssc.rawSocketStream("localhost", 12345);
   }
 
-  private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+  private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
     File existingFile = new File(testDir, "0");
     Files.write("0\n", existingFile, Charset.forName("UTF-8"));
-    assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
-
-    List<List<String>> expected = Arrays.asList(
-      Arrays.asList("0")
-    );
-
-    return expected;
+    Assert.assertTrue(existingFile.setLastModified(1000));
+    Assert.assertEquals(1000, existingFile.lastModified());
+    return Arrays.asList(Arrays.asList("0"));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/spark/blob/22730ad5/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 1b0787f..ec2bffd 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -36,7 +36,6 @@ import java.io.InputStreamReader;
 import java.io.Serializable;
 import java.net.ConnectException;
 import java.net.Socket;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class JavaReceiverAPISuite implements Serializable {
@@ -64,16 +63,16 @@ public class JavaReceiverAPISuite implements Serializable {
         ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
       JavaDStream<String> mapped = input.map(new Function<String, String>() {
         @Override
-        public String call(String v1) throws Exception {
+        public String call(String v1) {
           return v1 + ".";
         }
       });
       mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
         @Override
-        public Void call(JavaRDD<String> rdd) throws Exception {
-        long count = rdd.count();
-        dataCounter.addAndGet(count);
-        return null;
+        public Void call(JavaRDD<String> rdd) {
+          long count = rdd.count();
+          dataCounter.addAndGet(count);
+          return null;
         }
       });
 
@@ -83,7 +82,7 @@ public class JavaReceiverAPISuite implements Serializable {
 
       Thread.sleep(200);
       for (int i = 0; i < 6; i++) {
-        server.send("" + i + "\n"); // \n to make sure these are separate lines
+        server.send(i + "\n"); // \n to make sure these are separate lines
         Thread.sleep(100);
       }
       while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
@@ -95,50 +94,49 @@ public class JavaReceiverAPISuite implements Serializable {
       server.stop();
     }
   }
-}
 
-class JavaSocketReceiver extends Receiver<String> {
+  private static class JavaSocketReceiver extends Receiver<String> {
 
-  String host = null;
-  int port = -1;
+    String host = null;
+    int port = -1;
 
-  public JavaSocketReceiver(String host_ , int port_) {
-    super(StorageLevel.MEMORY_AND_DISK());
-    host = host_;
-    port = port_;
-  }
+    JavaSocketReceiver(String host_ , int port_) {
+      super(StorageLevel.MEMORY_AND_DISK());
+      host = host_;
+      port = port_;
+    }
 
-  @Override
-  public void onStart() {
-    new Thread()  {
-      @Override public void run() {
-        receive();
-      }
-    }.start();
-  }
+    @Override
+    public void onStart() {
+      new Thread()  {
+        @Override public void run() {
+          receive();
+        }
+      }.start();
+    }
 
-  @Override
-  public void onStop() {
-  }
+    @Override
+    public void onStop() {
+    }
 
-  private void receive() {
-    Socket socket = null;
-    try {
-      socket = new Socket(host, port);
-      BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-      String userInput;
-      while ((userInput = in.readLine()) != null) {
-        store(userInput);
+    private void receive() {
+      try {
+        Socket socket = new Socket(host, port);
+        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+        String userInput;
+        while ((userInput = in.readLine()) != null) {
+          store(userInput);
+        }
+        in.close();
+        socket.close();
+      } catch(ConnectException ce) {
+        ce.printStackTrace();
+        restart("Could not connect", ce);
+      } catch(Throwable t) {
+        t.printStackTrace();
+        restart("Error receiving data", t);
       }
-      in.close();
-      socket.close();
-    } catch(ConnectException ce) {
-      ce.printStackTrace();
-      restart("Could not connect", ce);
-    } catch(Throwable t) {
-      t.printStackTrace();
-      restart("Error receiving data", t);
     }
   }
-}
 
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message