beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [43/50] [abbrv] incubator-beam git commit: Try to clean up some build warnings, related to generics, and try to further untangle some generics issues. Update plugins. Fix some minor code issues from inspection.
Date Thu, 10 Mar 2016 20:59:08 GMT
Try to clean up some build warnings, related to generics, and try to further untangle some generics issues. Update plugins. Fix some minor code issues from inspection.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89a21ca9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89a21ca9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89a21ca9

Branch: refs/heads/master
Commit: 89a21ca9ed04f625e4ccbd2f2142f395f7f9979a
Parents: a9168bf
Author: Sean Owen <sowen@cloudera.com>
Authored: Thu Jan 21 15:25:49 2016 +0100
Committer: Tom White <tom@cloudera.com>
Committed: Thu Mar 10 11:15:17 2016 +0000

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 12 ++--
 .../com/cloudera/dataflow/hadoop/HadoopIO.java  |  2 +-
 .../cloudera/dataflow/hadoop/WritableCoder.java |  3 +-
 .../dataflow/spark/BroadcastHelper.java         |  8 +--
 .../cloudera/dataflow/spark/CoderHelpers.java   |  2 +-
 .../cloudera/dataflow/spark/DoFnFunction.java   |  1 +
 .../dataflow/spark/EvaluationContext.java       | 11 ++-
 .../dataflow/spark/EvaluationResult.java        |  4 +-
 .../dataflow/spark/MultiDoFnFunction.java       |  1 +
 .../dataflow/spark/SparkContextFactory.java     | 10 +--
 .../dataflow/spark/SparkPipelineRunner.java     | 28 ++++----
 .../dataflow/spark/SparkPipelineTranslator.java |  2 +-
 .../dataflow/spark/SparkProcessContext.java     |  4 +-
 .../dataflow/spark/TransformTranslator.java     | 20 +++---
 .../streaming/StreamingEvaluationContext.java   | 49 +++++++-------
 .../streaming/StreamingTransformTranslator.java | 71 +++++++++-----------
 .../StreamingWindowPipelineDetector.java        |  3 +-
 .../dataflow/spark/CombineGloballyTest.java     |  6 +-
 .../spark/HadoopFileFormatPipelineTest.java     | 20 ++----
 .../dataflow/spark/SerializationTest.java       |  5 +-
 .../dataflow/spark/SimpleWordCountTest.java     |  5 +-
 .../dataflow/spark/TransformTranslatorTest.java |  2 +-
 .../spark/streaming/FlattenStreamingTest.java   |  4 +-
 .../spark/streaming/KafkaStreamingTest.java     | 14 ++--
 .../streaming/SimpleStreamingWordCountTest.java |  2 +-
 .../streaming/utils/EmbeddedKafkaCluster.java   | 55 ++++++++-------
 26 files changed, 162 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 115ec71..399e9e7 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -32,7 +32,7 @@ License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-compiler-plugin</artifactId>
-                    <version>3.3</version>
+                    <version>3.5</version>
                     <configuration>
                         <source>${java.version}</source>
                         <target>${java.version}</target>
@@ -44,7 +44,7 @@ License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.18.1</version>
+                    <version>2.19.1</version>
                     <configuration>
                         <forkCount>1</forkCount>
                         <reuseForks>false</reuseForks>
@@ -53,7 +53,7 @@ License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-checkstyle-plugin</artifactId>
-                    <version>2.15</version>
+                    <version>2.17</version>
                     <configuration>
                         <headerLocation>build-resources/header-file.txt</headerLocation>
                         <configLocation>build-resources/checkstyle.xml</configLocation>
@@ -100,7 +100,7 @@ License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-clean-plugin</artifactId>
-                    <version>2.6.1</version>
+                    <version>3.0.0</version>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
@@ -133,7 +133,7 @@ License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-release-plugin</artifactId>
-                    <version>2.5.2</version>
+                    <version>2.5.3</version>
                     <dependencies>
                         <dependency>
                             <groupId>org.apache.maven.scm</groupId>
@@ -224,7 +224,7 @@ License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>
-                    <version>2.4.1</version>
+                    <version>2.4.3</version>
                     <executions>
                         <execution>
                             <phase>package</phase>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
index 6389db3..c79f211 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
@@ -99,7 +99,7 @@ public final class HadoopIO {
     private Write() {
     }
 
-    public static <K, V> Bound to(String filenamePrefix,
+    public static <K, V> Bound<K, V> to(String filenamePrefix,
         Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> value) {
       return new Bound<>(filenamePrefix, format, key, value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
index 759fb58..324b203 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
 /**
- * A {@code WritableCoder} is a {@link com.google.cloud.dataflow.sdk.coders.Coder} for a
- * Java class that implements {@link org.apache.hadoop.io.Writable}.
+ * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
  *
  * <p> To use, specify the coder type on a PCollection:
  * <pre>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
index 6ef70f3..8dca939 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
@@ -28,10 +28,10 @@ import org.slf4j.LoggerFactory;
 abstract class BroadcastHelper<T> implements Serializable {
 
   /**
-   * If the property <code>dataflow.spark.directBroadcast</code> is set to
-   * <code>true</code> then Spark serialization (Kryo) will be used to broadcast values
+   * If the property {@code dataflow.spark.directBroadcast} is set to
+   * {@code true} then Spark serialization (Kryo) will be used to broadcast values
    * in View objects. By default this property is not set, and values are coded using
-   * the appropriate {@link com.google.cloud.dataflow.sdk.coders.Coder}.
+   * the appropriate {@link Coder}.
    */
   public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
 
@@ -78,7 +78,7 @@ abstract class BroadcastHelper<T> implements Serializable {
 
   /**
    * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that uses a
-   * {@link com.google.cloud.dataflow.sdk.coders.Coder} to encode values as byte arrays
+   * {@link Coder} to encode values as byte arrays
    * before broadcasting.
    * @param <T>
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
index b5e86b6..0ae06c1 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
@@ -159,7 +159,7 @@ public final class CoderHelpers {
 
   /**
    * A function wrapper for converting a byte array pair to a key-value pair, where
-   * values are <code>Iterable</code>.
+   * values are {@link Iterable}.
    *
    * @param keyCoder Coder to deserialize keys.
    * @param valueCoder Coder to deserialize values.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
index 6617c56..2bcfec3 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
@@ -84,6 +84,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
       outputs.clear();
     }
 
+    @Override
     protected Iterator<WindowedValue<O>> getOutputIterator() {
       return outputs.iterator();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
index 68e9d27..356acab 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
@@ -25,7 +25,6 @@ import java.util.Set;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
 import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
 import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
 import com.google.cloud.dataflow.sdk.transforms.Aggregator;
@@ -50,7 +49,7 @@ public class EvaluationContext implements EvaluationResult {
   private final JavaSparkContext jsc;
   private final Pipeline pipeline;
   private final SparkRuntimeContext runtime;
-  private final CoderRegistry registry;
+  //private final CoderRegistry registry;
   private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>();
   private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>();
   private final Set<PValue> multireads = new LinkedHashSet<>();
@@ -61,7 +60,7 @@ public class EvaluationContext implements EvaluationResult {
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
     this.jsc = jsc;
     this.pipeline = pipeline;
-    this.registry = pipeline.getCoderRegistry();
+    //this.registry = pipeline.getCoderRegistry();
     this.runtime = new SparkRuntimeContext(jsc, pipeline);
   }
 
@@ -87,7 +86,7 @@ public class EvaluationContext implements EvaluationResult {
       this.rdd = rdd;
     }
 
-    public JavaRDDLike<WindowedValue<T>, ?> getRDD() {
+    JavaRDDLike<WindowedValue<T>, ?> getRDD() {
       if (rdd == null) {
         Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
             new Function<T, WindowedValue<T>>() {
@@ -105,7 +104,7 @@ public class EvaluationContext implements EvaluationResult {
       return rdd;
     }
 
-    public Iterable<T> getValues(PCollection<T> pcollection) {
+    Iterable<T> getValues(PCollection<T> pcollection) {
       if (values == null) {
         coder = pcollection.getCoder();
         JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction())
@@ -121,7 +120,7 @@ public class EvaluationContext implements EvaluationResult {
       return values;
     }
 
-    public Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
+    Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
       return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() {
         @Override
         public WindowedValue<T> apply(T t) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
index b40d449..aad029a 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
@@ -36,7 +36,7 @@ public interface EvaluationResult extends PipelineResult {
   /**
    * Retrieve an object of Type T associated with the PValue passed in.
    *
-   * @param pval PValue to retireve associated data for.
+   * @param pval PValue to retrieve associated data for.
    * @param <T>  Type of object to return.
    * @return Native object.
    */
@@ -55,7 +55,7 @@ public interface EvaluationResult extends PipelineResult {
   /**
    * Releases any runtime resources, including distributed-execution contexts currently held by
    * this EvaluationResult; once close() has been called,
-   * {@link com.cloudera.dataflow.spark.EvaluationResult#get(PCollection)} might
+   * {@link EvaluationResult#get(PCollection)} might
    * not work for subsequent calls.
    */
   void close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
index 17daff0..d269788 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
@@ -98,6 +98,7 @@ class MultiDoFnFunction<I, O>
       outputs.clear();
     }
 
+    @Override
     protected Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> getOutputIterator() {
       return Iterators.transform(outputs.entries().iterator(),
           new Function<Map.Entry<TupleTag<?>, WindowedValue<?>>,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
index 97cbc20..d3e8c9b 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
@@ -22,11 +22,11 @@ import org.apache.spark.serializer.KryoSerializer;
 final class SparkContextFactory {
 
   /**
-   * If the property <code>dataflow.spark.test.reuseSparkContext</code> is set to
-   * <code>true</code> then the Spark context will be reused for dataflow pipelines.
+   * If the property {@code dataflow.spark.test.reuseSparkContext} is set to
+   * {@code true} then the Spark context will be reused for dataflow pipelines.
    * This property should only be enabled for tests.
    */
-  public static final String TEST_REUSE_SPARK_CONTEXT =
+  static final String TEST_REUSE_SPARK_CONTEXT =
       "dataflow.spark.test.reuseSparkContext";
   private static JavaSparkContext sparkContext;
   private static String sparkMaster;
@@ -34,7 +34,7 @@ final class SparkContextFactory {
   private SparkContextFactory() {
   }
 
-  public static synchronized JavaSparkContext getSparkContext(String master, String appName) {
+  static synchronized JavaSparkContext getSparkContext(String master, String appName) {
     if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
       if (sparkContext == null) {
         sparkContext = createSparkContext(master, appName);
@@ -50,7 +50,7 @@ final class SparkContextFactory {
     }
   }
 
-  public static synchronized void stopSparkContext(JavaSparkContext context) {
+  static synchronized void stopSparkContext(JavaSparkContext context) {
     if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
       context.stop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
index e980ae3..5287f20 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
@@ -120,7 +120,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
             mOptions.getClass().getSimpleName());
       }
       LOG.info("Executing pipeline using the SparkPipelineRunner.");
-      final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
+      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
               .getSparkMaster(), mOptions.getAppName());
 
       if (mOptions.isStreaming()) {
@@ -135,8 +135,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
         }
 
         Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
-        LOG.info("Setting Spark streaming batchInterval to " +
-            batchInterval.milliseconds() + "msec");
+        LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
         EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
 
         pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
@@ -179,7 +178,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
       createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
       Duration batchDuration) {
     SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions;
-    final JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
+    JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
     return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
   }
 
@@ -208,18 +207,15 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
 
     @Override
     public void enterCompositeTransform(TransformTreeNode node) {
-      if (inTranslatedCompositeNode()) {
-        return;
-      }
-
-      //noinspection unchecked
-      if (node.getTransform() != null
-              && translator.hasTranslation(
-              (Class<? extends PTransform<?, ?>>) node.getTransform().getClass())) {
-        LOG.info("Entering directly-translatable composite transform: '{}'",
-                node.getFullName());
-        LOG.debug("Composite transform class: '{}'", node.getTransform().getClass());
-        currentTranslatedCompositeNode = node;
+      if (!inTranslatedCompositeNode() && node.getTransform() != null) {
+        @SuppressWarnings("unchecked")
+        Class<PTransform<?, ?>> transformClass =
+            (Class<PTransform<?, ?>>) node.getTransform().getClass();
+        if (translator.hasTranslation(transformClass)) {
+          LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
+          LOG.debug("Composite transform class: '{}'", node.getTransform().getClass());
+          currentTranslatedCompositeNode = node;
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
index ff49317..d90363f 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
@@ -23,5 +23,5 @@ public interface SparkPipelineTranslator {
 
   boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
 
-  TransformEvaluator<? extends PTransform<?, ?>> translate(Class<? extends PTransform<?, ?>> clazz);
+  <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
index f68efb4..73cec25 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
@@ -195,7 +195,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
     private final Iterator<WindowedValue<I>> inputIterator;
     private final DoFn<I, O> doFn;
     private Iterator<V> outputIterator;
-    private boolean calledFinish = false;
+    private boolean calledFinish;
 
     ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) {
       this.inputIterator = iterator;
@@ -242,7 +242,7 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
   }
 
   static class SparkProcessException extends RuntimeException {
-    public SparkProcessException(Throwable t) {
+    SparkProcessException(Throwable t) {
       super(t);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 560d62f..db335ed 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -247,8 +247,8 @@ public final class TransformTranslator {
         } catch (CannotProvideCoderException e) {
           throw new IllegalStateException("Could not determine coder for accumulator", e);
         }
-        final Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder);
-        final Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
+        Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder);
+        Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
 
         // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
         // since the functions passed to combineByKey don't receive the associated key of each
@@ -437,7 +437,8 @@ public final class TransformTranslator {
               all.filter(new TupleTagFilter(e.getKey()));
           @SuppressWarnings("unchecked")
           // Object is the best we can do since different outputs can have different tags
-          JavaRDD<WindowedValue<Object>> values = (JavaRDD) filtered.values();
+          JavaRDD<WindowedValue<Object>> values =
+              (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
           context.setRDD(e.getValue(), values);
         }
       }
@@ -602,19 +603,19 @@ public final class TransformTranslator {
       this.filenameSuffix = filenameSuffix;
     }
 
-    public int getNumShards() {
+    int getNumShards() {
       return numShards;
     }
 
-    public String getShardTemplate() {
+    String getShardTemplate() {
       return shardTemplate;
     }
 
-    public String getFilenamePrefix() {
+    String getFilenamePrefix() {
       return filenamePrefix;
     }
 
-    public String getFilenameSuffix() {
+    String getFilenameSuffix() {
       return filenameSuffix;
     }
   }
@@ -642,7 +643,7 @@ public final class TransformTranslator {
     rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
   }
 
-  static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class);
+  private static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class);
 
   private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
     return new TransformEvaluator<Window.Bound<T>>() {
@@ -796,8 +797,7 @@ public final class TransformTranslator {
     }
 
     @Override
-    public TransformEvaluator<? extends PTransform<?, ?>> translate(
-        Class<? extends PTransform<?, ?>> clazz) {
+    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
       return getTransformEvaluator(clazz);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
index 3290729..5ecd562 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingEvaluationContext.java
@@ -70,17 +70,17 @@ public class StreamingEvaluationContext extends EvaluationContext {
     private Coder<T> coder;
     private JavaDStream<WindowedValue<T>> dStream;
 
-    public DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
+    DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
       this.values = values;
       this.coder = coder;
     }
 
-    public DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
+    DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
       this.dStream = dStream;
     }
 
     @SuppressWarnings("unchecked")
-    public JavaDStream<WindowedValue<T>> getDStream() {
+    JavaDStream<WindowedValue<T>> getDStream() {
       if (dStream == null) {
         // create the DStream from values
         Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
@@ -96,16 +96,14 @@ public class StreamingEvaluationContext extends EvaluationContext {
     }
   }
 
-  public <T> void setDStreamFromQueue(PTransform<?, ?> transform, Iterable<Iterable<T>> values,
-      Coder<T> coder) {
+  <T> void setDStreamFromQueue(
+      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
     pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
   }
 
-  public <T, R extends JavaRDDLike<WindowedValue<T>, R>>
-      void setStream(PTransform<?, ?> transform, JavaDStreamLike<WindowedValue<T>, ?, R> dStream) {
+  <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
     PValue pvalue = (PValue) getOutput(transform);
-    @SuppressWarnings("unchecked")
-    DStreamHolder<T> dStreamHolder = new DStreamHolder((JavaDStream) dStream);
+    DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
     pstreams.put(pvalue, dStreamHolder);
     leafStreams.add(dStreamHolder);
   }
@@ -115,11 +113,11 @@ public class StreamingEvaluationContext extends EvaluationContext {
     return pstreams.containsKey(pvalue);
   }
 
-  public JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
+  JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
     return getStream((PValue) getInput(transform));
   }
 
-  public JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
+  JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
     DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
     JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
     leafStreams.remove(dStreamHolder);
@@ -127,13 +125,13 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   // used to set the RDD from the DStream in the RDDHolder for transformation
-  public <T> void setInputRDD(PTransform<? extends PInput, ?> transform,
-      JavaRDDLike<WindowedValue<T>, ?> rdd) {
+  <T> void setInputRDD(
+      PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
     setRDD((PValue) getInput(transform), rdd);
   }
 
   // used to get the RDD transformation output and use it as the DStream transformation output
-  public JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
+  JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
     return getRDD((PValue) getOutput(transform));
   }
 
@@ -144,19 +142,21 @@ public class StreamingEvaluationContext extends EvaluationContext {
   @Override
   protected void computeOutputs() {
     for (DStreamHolder<?> streamHolder : leafStreams) {
-      @SuppressWarnings("unchecked")
-      JavaDStream<WindowedValue<?>> stream = (JavaDStream) streamHolder.getDStream();
-      stream.foreachRDD(new Function<JavaRDD<WindowedValue<?>>, Void>() {
-        @Override
-        public Void call(JavaRDD<WindowedValue<?>> rdd) throws Exception {
-          rdd.rdd().cache();
-          rdd.count();
-          return null;
-        }
-      }); // force a DStream action
+      computeOutput(streamHolder);
     }
   }
 
+  private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
+    streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() {
+      @Override
+      public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+        rdd.rdd().cache();
+        rdd.count();
+        return null;
+      }
+    }); // force a DStream action
+  }
+
   @Override
   public void close() {
     if (timeout > 0) {
@@ -178,6 +178,7 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   //---------------- override in order to expose in package
+  @Override
   protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
     return super.getInput(transform);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
index 0153f38..d8ae5e8 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingTransformTranslator.java
@@ -81,7 +81,7 @@ public final class StreamingTransformTranslator {
   private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
     return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
       @Override
-      public void evaluate(ConsoleIO.Write.Unbound transform, EvaluationContext context) {
+      public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
         JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
             (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
@@ -117,13 +117,11 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<com.google.cloud.dataflow.sdk.transforms.Create.Values<T>>
-      create() {
-    return new TransformEvaluator<com.google.cloud.dataflow.sdk.transforms.Create.Values<T>>() {
+  private static <T> TransformEvaluator<Create.Values<T>> create() {
+    return new TransformEvaluator<Create.Values<T>>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(com.google.cloud.dataflow.sdk.transforms.Create.Values<T>
-                                   transform, EvaluationContext context) {
+      public void evaluate(Create.Values<T> transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         Iterable<T> elems = transform.getElements();
         Coder<T> coder = sec.getOutput(transform).getCoder();
@@ -179,10 +177,9 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<PT>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(final PT transform,
-                           final EvaluationContext context) {
-        final TransformEvaluator rddEvaluator =
-            rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
+      public void evaluate(PT transform, EvaluationContext context) {
+        TransformEvaluator<PT> rddEvaluator =
+            rddTranslator.translate((Class<PT>) transform.getClass());
 
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         if (sec.hasStream(transform)) {
@@ -212,11 +209,11 @@ public final class StreamingTransformTranslator {
 
     private final StreamingEvaluationContext context;
     private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator rddEvaluator;
+    private final TransformEvaluator<PT> rddEvaluator;
     private final PT transform;
 
 
-    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator rddEvaluator,
+    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
         PT transform) {
       this.context = context;
       this.appliedPTransform = context.getCurrentTransform();
@@ -249,10 +246,9 @@ public final class StreamingTransformTranslator {
       final SparkPipelineTranslator rddTranslator) {
     return new TransformEvaluator<PT>() {
       @Override
-      public void evaluate(final PT transform,
-                           final EvaluationContext context) {
-        final TransformEvaluator rddEvaluator =
-            rddTranslator.translate((Class<? extends PTransform<?, ?>>) transform.getClass());
+      public void evaluate(PT transform, EvaluationContext context) {
+        TransformEvaluator<PT> rddEvaluator =
+            rddTranslator.translate((Class<PT>) transform.getClass());
 
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         if (sec.hasStream(transform)) {
@@ -278,12 +274,12 @@ public final class StreamingTransformTranslator {
 
     private final StreamingEvaluationContext context;
     private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator rddEvaluator;
+    private final TransformEvaluator<PT> rddEvaluator;
     private final PT transform;
 
 
-    private RDDOutputOperator(StreamingEvaluationContext context, TransformEvaluator rddEvaluator,
-        PT transform) {
+    private RDDOutputOperator(StreamingEvaluationContext context,
+        TransformEvaluator<PT> rddEvaluator, PT transform) {
       this.context = context;
       this.appliedPTransform = context.getCurrentTransform();
       this.rddEvaluator = rddEvaluator;
@@ -302,7 +298,7 @@ public final class StreamingTransformTranslator {
     }
   }
 
-  static final TransformTranslator.FieldGetter WINDOW_FG =
+  private static final TransformTranslator.FieldGetter WINDOW_FG =
       new TransformTranslator.FieldGetter(Window.Bound.class);
 
   private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
@@ -334,7 +330,6 @@ public final class StreamingTransformTranslator {
         JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
             (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
             sec.getStream(transform);
-        //noinspection unchecked
         sec.setStream(transform, dstream.mapPartitions(dofn));
       }
     };
@@ -352,21 +347,17 @@ public final class StreamingTransformTranslator {
     EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
   }
 
-  private static final Set<Class<? extends PTransform>> UNSUPPORTTED_EVALUATORS = Sets
+  private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
       .newHashSet();
 
   static {
     //TODO - add support for the following
-    UNSUPPORTTED_EVALUATORS.add(TextIO.Read.Bound.class);
-    UNSUPPORTTED_EVALUATORS.add(TextIO.Write.Bound.class);
-    UNSUPPORTTED_EVALUATORS.add(AvroIO.Read.Bound.class);
-    UNSUPPORTTED_EVALUATORS.add(AvroIO.Write.Bound.class);
-    UNSUPPORTTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
-    UNSUPPORTTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
-  }
-
-  private static <PT extends PTransform<?, ?>> boolean hasTransformEvaluator(Class<PT> clazz) {
-    return EVALUATORS.containsKey(clazz);
+    UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
+    UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
   }
 
   @SuppressWarnings("unchecked")
@@ -374,16 +365,16 @@ public final class StreamingTransformTranslator {
       getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
     TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
     if (transform == null) {
-      if (UNSUPPORTTED_EVALUATORS.contains(clazz)) {
+      if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
         throw new UnsupportedOperationException("Dataflow transformation " + clazz
           .getCanonicalName()
           + " is currently unsupported by the Spark streaming pipeline");
       }
       // DStream transformations will transform an RDD into another RDD
       // Actions will create output
-      // In Dataflow it depends on the PTranform's Input and Output class
-      Class pTOutputClazz = getPTransformOutputClazz(clazz);
-      if (pTOutputClazz == PDone.class) {
+      // In Dataflow it depends on the PTransform's Input and Output class
+      Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
+      if (PDone.class.equals(pTOutputClazz)) {
         return foreachRDD(rddTranslator);
       } else {
         return rddTransform(rddTranslator);
@@ -392,8 +383,7 @@ public final class StreamingTransformTranslator {
     return transform;
   }
 
-  private static <PT extends PTransform<?, ?>> Class
-      getPTransformOutputClazz(Class<PT> clazz) {
+  private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
     Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
     return TypeToken.of(clazz).resolveType(types[1]).getRawType();
   }
@@ -413,12 +403,11 @@ public final class StreamingTransformTranslator {
     @Override
     public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
       // streaming includes rdd transformations as well
-      return hasTransformEvaluator(clazz) || rddTranslator.hasTranslation(clazz);
+      return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
     }
 
     @Override
-    public TransformEvaluator<? extends PTransform<?, ?>> translate(
-        Class<? extends PTransform<?, ?>> clazz) {
+    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
       return getTransformEvaluator(clazz, rddTranslator);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
index f9b2d2b..406dfcc 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/streaming/StreamingWindowPipelineDetector.java
@@ -47,10 +47,11 @@ public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.E
     super(translator);
   }
 
-  static final TransformTranslator.FieldGetter WINDOW_FG =
+  private static final TransformTranslator.FieldGetter WINDOW_FG =
       new TransformTranslator.FieldGetter(Window.Bound.class);
 
   // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
+  @Override
   protected <PT extends PTransform<? super PInput, POutput>> void
       doVisitTransform(TransformTreeNode node) {
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
index 51ba905..667e949 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
@@ -21,7 +21,6 @@ import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.Iterables;
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import org.junit.Test;
@@ -47,8 +46,7 @@ public class CombineGloballyTest {
     res.close();
   }
 
-  public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> implements
-      Serializable {
+  public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> {
 
     @Override
     public StringBuilder createAccumulator() {
@@ -78,7 +76,7 @@ public class CombineGloballyTest {
     }
 
     private static StringBuilder combine(StringBuilder accum, String datum) {
-      if (null == accum) {
+      if (accum == null) {
         return new StringBuilder(datum);
       } else {
         accum.append(",").append(datum);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
index b351018..579ada5 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Reader;
@@ -78,35 +77,28 @@ public class HadoopFileFormatPipelineTest {
 
     IntWritable key = new IntWritable();
     Text value = new Text();
-    Reader reader = null;
-    try {
-      reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())));
+    try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) {
       int i = 0;
-      while(reader.next(key, value)) {
+      while (reader.next(key, value)) {
         assertEquals(i, key.get());
         assertEquals("value-" + i, value.toString());
         i++;
       }
-    } finally {
-      IOUtils.closeStream(reader);
     }
   }
 
   private void populateFile() throws IOException {
     IntWritable key = new IntWritable();
     Text value = new Text();
-    Writer writer = null;
-    try {
-      writer = SequenceFile.createWriter(new Configuration(),
-          Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-          Writer.file(new Path(this.inputFile.toURI())));
+    try (Writer writer = SequenceFile.createWriter(
+        new Configuration(),
+        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+        Writer.file(new Path(this.inputFile.toURI())))) {
       for (int i = 0; i < 5; i++) {
         key.set(i);
         value.set("value-" + i);
         writer.append(key, value);
       }
-    } finally {
-      IOUtils.closeStream(writer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
index a8edb3a..21a839b 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java
@@ -38,6 +38,8 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.regex.Pattern;
+
 import org.junit.Test;
 
 public class SerializationTest {
@@ -126,13 +128,14 @@ public class SerializationTest {
    * A DoFn that tokenizes lines of text into individual words.
    */
   static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
+    private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
     @Override
     public void processElement(ProcessContext c) {
       // Split the line into words.
-      String[] words = c.element().toString().split("[^a-zA-Z']+");
+      String[] words = WORD_BOUNDARY.split(c.element().toString());
 
       // Keep track of the number of lines without any words encountered while tokenizing.
       // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
index 3d85f46..1c2f7a9 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SimpleWordCountTest.java
@@ -31,6 +31,8 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.regex.Pattern;
+
 import org.junit.Test;
 
 public class SimpleWordCountTest {
@@ -60,13 +62,14 @@ public class SimpleWordCountTest {
    * A DoFn that tokenizes lines of text into individual words.
    */
   static class ExtractWordsFn extends DoFn<String, String> {
+    private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
     @Override
     public void processElement(ProcessContext c) {
       // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
+      String[] words = WORD_BOUNDARY.split(c.element());
 
       // Keep track of the number of lines without any words encountered while tokenizing.
       // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
index 540bdd9..73b3643 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java
@@ -84,7 +84,7 @@ public class TransformTranslatorTest {
     Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray());
   }
 
-  private String runPipeline(String name, PipelineRunner runner) {
+  private String runPipeline(String name, PipelineRunner<?> runner) {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
     String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name);
     PCollection<String> lines =  p.apply(TextIO.Read.from("src/test/resources/test_text.txt"));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
index d818e9a..0872834 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/FlattenStreamingTest.java
@@ -27,8 +27,6 @@ import com.google.cloud.dataflow.sdk.values.PCollectionList;
 import com.cloudera.dataflow.io.CreateStream;
 import com.cloudera.dataflow.spark.EvaluationResult;
 import com.cloudera.dataflow.spark.SparkPipelineRunner;
-import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptions;
-import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsFactory;
 import com.cloudera.dataflow.spark.streaming.utils.DataflowAssertStreaming;
 
 import org.joda.time.Duration;
@@ -53,7 +51,7 @@ public class FlattenStreamingTest {
           Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
   private static final String[] EXPECTED_UNION = {
           "one", "two", "three", "four", "five", "six", "seven", "eight"};
-  final static long TEST_TIMEOUT_MSEC = 1000L;
+  private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @Test
   public void testRun() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
index 8778e00..f68aea8 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/KafkaStreamingTest.java
@@ -65,10 +65,10 @@ public class KafkaStreamingTest {
   private static final Set<String> EXPECTED = ImmutableSet.of(
           "k1,v1", "k2,v2", "k3,v3", "k4,v4"
   );
-  private final static long TEST_TIMEOUT_MSEC = 1000L;
+  private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @BeforeClass
-  public static void init() throws IOException, InterruptedException {
+  public static void init() throws IOException {
     EMBEDDED_ZOOKEEPER.startup();
     EMBEDDED_KAFKA_CLUSTER.startup();
 
@@ -78,12 +78,12 @@ public class KafkaStreamingTest {
     producerProps.put("request.required.acks", 1);
     producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
     Serializer<String> stringSerializer = new StringSerializer();
-    @SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
-            new KafkaProducer(producerProps, stringSerializer, stringSerializer);
-    for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
-      kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
+            new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
+      for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
+        kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
+      }
     }
-    kafkaProducer.close();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
index eb23b5a..e22e616 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/SimpleStreamingWordCountTest.java
@@ -45,7 +45,7 @@ public class SimpleStreamingWordCountTest {
       Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
   private static final Set<String> EXPECTED_COUNT_SET =
       ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
-  final static long TEST_TIMEOUT_MSEC = 1000L;
+  private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @Test
   public void testRun() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89a21ca9/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
index 6daae54..e75d729 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/streaming/utils/EmbeddedKafkaCluster.java
@@ -32,11 +32,16 @@ import java.util.Random;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * https://gist.github.com/fjavieralba/7930018
  */
 public class EmbeddedKafkaCluster {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
   private final List<Integer> ports;
   private final String zkConnection;
   private final Properties baseProperties;
@@ -59,28 +64,28 @@ public class EmbeddedKafkaCluster {
     this.ports = resolvePorts(ports);
     this.baseProperties = baseProperties;
 
-    this.brokers = new ArrayList<KafkaServer>();
-    this.logDirs = new ArrayList<File>();
+    this.brokers = new ArrayList<>();
+    this.logDirs = new ArrayList<>();
 
     this.brokerList = constructBrokerList(this.ports);
   }
 
-  private List<Integer> resolvePorts(List<Integer> ports) {
-    List<Integer> resolvedPorts = new ArrayList<Integer>();
+  private static List<Integer> resolvePorts(List<Integer> ports) {
+    List<Integer> resolvedPorts = new ArrayList<>();
     for (Integer port : ports) {
       resolvedPorts.add(resolvePort(port));
     }
     return resolvedPorts;
   }
 
-  private int resolvePort(int port) {
+  private static int resolvePort(int port) {
     if (port == -1) {
       return TestUtils.getAvailablePort();
     }
     return port;
   }
 
-  private String constructBrokerList(List<Integer> ports) {
+  private static String constructBrokerList(List<Integer> ports) {
     StringBuilder sb = new StringBuilder();
     for (Integer port : ports) {
       if (sb.length() > 0) {
@@ -113,7 +118,7 @@ public class EmbeddedKafkaCluster {
   }
 
 
-  private KafkaServer startBroker(Properties props) {
+  private static KafkaServer startBroker(Properties props) {
     KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime());
     server.startup();
     return server;
@@ -144,24 +149,21 @@ public class EmbeddedKafkaCluster {
       try {
         broker.shutdown();
       } catch (Exception e) {
-        e.printStackTrace();
+        LOG.warn("{}", e.getMessage(), e);
       }
     }
     for (File logDir : logDirs) {
       try {
         TestUtils.deleteFile(logDir);
       } catch (FileNotFoundException e) {
-        e.printStackTrace();
+        LOG.warn("{}", e.getMessage(), e);
       }
     }
   }
 
   @Override
   public String toString() {
-    final StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{");
-    sb.append("brokerList='").append(brokerList).append('\'');
-    sb.append('}');
-    return sb.toString();
+    return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}";
   }
 
   public static class EmbeddedZookeeper {
@@ -185,7 +187,7 @@ public class EmbeddedKafkaCluster {
       this.tickTime = tickTime;
     }
 
-    private int resolvePort(int port) {
+    private static int resolvePort(int port) {
       if (port == -1) {
         return TestUtils.getAvailablePort();
       }
@@ -198,8 +200,8 @@ public class EmbeddedKafkaCluster {
       }
       this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port),
               1024);
-      this.snapshotDir = TestUtils.constructTempDir("embeeded-zk/snapshot");
-      this.logDir = TestUtils.constructTempDir("embeeded-zk/log");
+      this.snapshotDir = TestUtils.constructTempDir("embedded-zk/snapshot");
+      this.logDir = TestUtils.constructTempDir("embedded-zk/log");
 
       try {
         factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
@@ -245,22 +247,22 @@ public class EmbeddedKafkaCluster {
 
     @Override
     public String toString() {
-      final StringBuilder sb = new StringBuilder("EmbeddedZookeeper{");
-      sb.append("connection=").append(getConnection());
-      sb.append('}');
-      return sb.toString();
+      return "EmbeddedZookeeper{" + "connection=" + getConnection() + "}";
     }
   }
 
   static class SystemTime implements Time {
+    @Override
     public long milliseconds() {
       return System.currentTimeMillis();
     }
 
+    @Override
     public long nanoseconds() {
       return System.nanoTime();
     }
 
+    @Override
     public void sleep(long ms) {
       try {
         Thread.sleep(ms);
@@ -270,13 +272,13 @@ public class EmbeddedKafkaCluster {
     }
   }
 
-  static class TestUtils {
+  static final class TestUtils {
     private static final Random RANDOM = new Random();
 
     private TestUtils() {
     }
 
-    public static File constructTempDir(String dirPrefix) {
+    static File constructTempDir(String dirPrefix) {
       File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt
               (10000000));
       if (!file.mkdirs()) {
@@ -286,20 +288,17 @@ public class EmbeddedKafkaCluster {
       return file;
     }
 
-    public static int getAvailablePort() {
+    static int getAvailablePort() {
       try {
-        ServerSocket socket = new ServerSocket(0);
-        try {
+        try (ServerSocket socket = new ServerSocket(0)) {
           return socket.getLocalPort();
-        } finally {
-          socket.close();
         }
       } catch (IOException e) {
         throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e);
       }
     }
 
-    public static boolean deleteFile(File path) throws FileNotFoundException {
+    static boolean deleteFile(File path) throws FileNotFoundException {
       if (!path.exists()) {
         throw new FileNotFoundException(path.getAbsolutePath());
       }


Mime
View raw message