beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [36/50] [abbrv] incubator-beam git commit: [BEAM-242] Enable checkstyle and fix checkstyle errors in Flink runner
Date Tue, 13 Sep 2016 00:41:07 GMT
[BEAM-242] Enable checkstyle and fix checkstyle errors in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5eb44aa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5eb44aa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5eb44aa0

Branch: refs/heads/gearpump-runner
Commit: 5eb44aa01157ca62f1a618d1738eb064ca3a10e4
Parents: 9ae5cc7
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Authored: Thu Aug 25 16:19:54 2016 +0200
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Sep 12 17:40:12 2016 -0700

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |   2 -
 .../FlinkPipelineExecutionEnvironment.java      |  17 +-
 .../runners/flink/FlinkPipelineOptions.java     |  27 ++--
 .../runners/flink/FlinkRunnerRegistrar.java     |   6 +
 .../beam/runners/flink/FlinkRunnerResult.java   |  17 +-
 .../beam/runners/flink/TestFlinkRunner.java     |   8 +-
 .../apache/beam/runners/flink/package-info.java |  22 +++
 .../FlinkBatchPipelineTranslator.java           |  15 +-
 .../FlinkBatchTranslationContext.java           |  10 +-
 .../translation/FlinkPipelineTranslator.java    |   2 +-
 .../FlinkStreamingTransformTranslators.java     |   5 +-
 .../flink/translation/TranslationMode.java      |   8 +-
 .../translation/functions/package-info.java     |  22 +++
 .../runners/flink/translation/package-info.java |  22 +++
 .../translation/types/CoderTypeSerializer.java  |   2 +-
 .../types/EncodedValueSerializer.java           | 162 ++++++++++---------
 .../flink/translation/types/package-info.java   |  22 +++
 .../utils/SerializedPipelineOptions.java        |   2 +-
 .../flink/translation/utils/package-info.java   |  22 +++
 .../wrappers/DataOutputViewWrapper.java         |   2 +-
 .../translation/wrappers/package-info.java      |  22 +++
 .../wrappers/streaming/DoFnOperator.java        |  12 +-
 .../streaming/SingletonKeyedWorkItem.java       |   5 +
 .../streaming/SingletonKeyedWorkItemCoder.java  |  14 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../wrappers/streaming/WorkItemKeySelector.java |   3 +-
 .../streaming/io/UnboundedFlinkSink.java        |  13 +-
 .../streaming/io/UnboundedFlinkSource.java      |  29 ++--
 .../streaming/io/UnboundedSocketSource.java     |  46 ++++--
 .../wrappers/streaming/io/package-info.java     |  22 +++
 .../wrappers/streaming/package-info.java        |  22 +++
 .../beam/runners/flink/PipelineOptionsTest.java |   3 +
 .../beam/runners/flink/WriteSinkITCase.java     |   3 +-
 .../apache/beam/runners/flink/package-info.java |  22 +++
 .../streaming/FlinkStateInternalsTest.java      |   3 +-
 .../flink/streaming/GroupByNullKeyTest.java     |   6 +
 .../streaming/TopWikipediaSessionsITCase.java   |   2 +-
 .../streaming/UnboundedSourceWrapperTest.java   |  33 ++--
 .../runners/flink/streaming/package-info.java   |  22 +++
 39 files changed, 490 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 08adc60..7c32280 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -234,12 +234,10 @@
         </executions>
       </plugin>
 
-      <!-- Checkstyle errors for now
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
-      -->
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d1977a4..a5d33b4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory;
 /**
  * The class that instantiates and manages the execution of a given job.
  * Depending on if the job is a Streaming or Batch processing one, it creates
- * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
- * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
- * {@link FlinkStreamingPipelineTranslator}) to transform the Beam job into a Flink one, and
- * executes the (translated) job.
+ * the adequate execution environment ({@link ExecutionEnvironment}
+ * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator}
+ * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to
+ * transform the Beam job into a Flink one, and executes the (translated) job.
  */
 public class FlinkPipelineExecutionEnvironment {
 
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
 
   private final FlinkPipelineOptions options;
 
@@ -79,8 +80,8 @@ public class FlinkPipelineExecutionEnvironment {
    * Depending on if the job is a Streaming or a Batch one, this method creates
    * the necessary execution environment and pipeline translator, and translates
    * the {@link org.apache.beam.sdk.values.PCollection} program into
-   * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
-   * one.
+   * a {@link org.apache.flink.api.java.DataSet}
+   * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
    * */
   public void translate(Pipeline pipeline) {
     this.flinkBatchEnv = null;
@@ -213,7 +214,7 @@ public class FlinkPipelineExecutionEnvironment {
     // If the value is not -1, then the validity checks are applied.
     // By default, checkpointing is disabled.
     long checkpointInterval = options.getCheckpointingInterval();
-    if(checkpointInterval != -1) {
+    if (checkpointInterval != -1) {
       if (checkpointInterval < 1) {
         throw new IllegalArgumentException("The checkpoint interval must be positive");
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 6561fa5..1fb23ec 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -29,7 +29,8 @@ import org.apache.beam.sdk.options.StreamingOptions;
 /**
  * Options which can be used to configure a Flink PipelineRunner.
  */
-public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+public interface FlinkPipelineOptions
+    extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
 
   /**
    * List of local files to make available to workers.
@@ -38,8 +39,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
    * <p>
    * The default value is the list of jars from the main program's classpath.
    */
-  @Description("Jar-Files to send to all workers and put on the classpath. " +
-      "The default value is all files from the classpath.")
+  @Description("Jar-Files to send to all workers and put on the classpath. "
+      + "The default value is all files from the classpath.")
   @JsonIgnore
   List<String> getFilesToStage();
   void setFilesToStage(List<String> value);
@@ -51,9 +52,9 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
    * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
    * "[auto]" will let the system decide where to execute the pipeline based on the environment.
    */
-  @Description("Address of the Flink Master where the Pipeline should be executed. Can" +
-      " either be of the form \"host:port\" or one of the special values [local], " +
-      "[collection] or [auto].")
+  @Description("Address of the Flink Master where the Pipeline should be executed. Can"
+      + " either be of the form \"host:port\" or one of the special values [local], "
+      + "[collection] or [auto].")
   String getFlinkMaster();
   void setFlinkMaster(String value);
 
@@ -62,21 +63,23 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
   Integer getParallelism();
   void setParallelism(Integer value);
 
-  @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
-      "fault tolerance).")
+  @Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
+      + "pipeline state used for fault tolerance).")
   @Default.Long(-1L)
   Long getCheckpointingInterval();
   void setCheckpointingInterval(Long interval);
 
-  @Description("Sets the number of times that failed tasks are re-executed. " +
-      "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
-      "that the system default value (as defined in the configuration) should be used.")
+  @Description("Sets the number of times that failed tasks are re-executed. "
+      + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
+      + "that the system default value (as defined in the configuration) should be used.")
   @Default.Integer(-1)
   Integer getNumberOfExecutionRetries();
   void setNumberOfExecutionRetries(Integer retries);
 
-  @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+  @Description("Sets the delay between executions. A value of {@code -1} "
+      + "indicates that the default value should be used.")
   @Default.Long(-1L)
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
index f328279..0e4b513 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -36,6 +36,9 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
 public class FlinkRunnerRegistrar {
   private FlinkRunnerRegistrar() { }
 
+  /**
+   * Pipeline runner registrar.
+   */
   @AutoService(PipelineRunnerRegistrar.class)
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
@@ -46,6 +49,9 @@ public class FlinkRunnerRegistrar {
     }
   }
 
+  /**
+   * Pipeline options registrar.
+   */
   @AutoService(PipelineOptionsRegistrar.class)
   public static class Options implements PipelineOptionsRegistrar {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index dd0733a..90bb64d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -35,9 +35,9 @@ public class FlinkRunnerResult implements PipelineResult {
   private final Map<String, Object> aggregators;
   private final long runtime;
   public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
-    this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
-        Collections.<String, Object>emptyMap() :
-        Collections.unmodifiableMap(aggregators);
+    this.aggregators = (aggregators == null || aggregators.isEmpty())
+        ? Collections.<String, Object>emptyMap()
+        : Collections.unmodifiableMap(aggregators);
     this.runtime = runtime;
   }
 
@@ -47,7 +47,8 @@ public class FlinkRunnerResult implements PipelineResult {
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+      throws AggregatorRetrievalException {
     // TODO provide a list of all accumulator step values
     Object value = aggregators.get(aggregator.getName());
     if (value != null) {
@@ -65,10 +66,10 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public String toString() {
-    return "FlinkRunnerResult{" +
-        "aggregators=" + aggregators +
-        ", runtime=" + runtime +
-        '}';
+    return "FlinkRunnerResult{"
+        + "aggregators=" + aggregators
+        + ", runtime=" + runtime
+        + '}';
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index dd231d6..67a7d38 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -26,6 +26,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
+/**
+ * Test Flink runner.
+ */
 public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
   private FlinkRunner delegate;
@@ -37,7 +40,8 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
   }
 
   public static TestFlinkRunner fromOptions(PipelineOptions options) {
-    FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
     return new TestFlinkRunner(flinkOptions);
   }
 
@@ -50,7 +54,7 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
 
   @Override
   public <OutputT extends POutput, InputT extends PInput>
-      OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+      OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
     return delegate.apply(transform, input);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
new file mode 100644
index 0000000..57f1e59
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 66c48b0..aa38bfb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -91,15 +91,20 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     // get the transformation corresponding to the node we are
     // currently visiting and translate it into its Flink alternative.
     PTransform<?, ?> transform = node.getTransform();
-    BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+    BatchTransformTranslator<?> translator =
+        FlinkBatchTransformTranslators.getTranslator(transform);
     if (translator == null) {
       LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+      throw new UnsupportedOperationException("The transform " + transform
+          + " is currently not supported.");
     }
     applyBatchTransform(transform, node, translator);
   }
 
-  private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+  private <T extends PTransform<?, ?>> void applyBatchTransform(
+      PTransform<?, ?> transform,
+      TransformTreeNode node,
+      BatchTransformTranslator<?> translator) {
 
     @SuppressWarnings("unchecked")
     T typedTransform = (T) transform;
@@ -116,8 +121,8 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
   /**
    * A translator of a {@link PTransform}.
    */
-  public interface BatchTransformTranslator<Type extends PTransform> {
-    void translateNode(Type transform, FlinkBatchTranslationContext context);
+  public interface BatchTransformTranslator<TransformT extends PTransform> {
+    void translateNode(TransformT transform, FlinkBatchTranslationContext context);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 835648e..611f5e6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -40,7 +40,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  * {@link FlinkBatchTransformTranslators}.
  */
 public class FlinkBatchTranslationContext {
-  
+
   private final Map<PValue, DataSet<?>> dataSets;
   private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
 
@@ -55,9 +55,9 @@ public class FlinkBatchTranslationContext {
   private final PipelineOptions options;
 
   private AppliedPTransform<?, ?, ?> currentTransform;
-  
+
   // ------------------------------------------------------------------------
-  
+
   public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
     this.env = env;
     this.options = options;
@@ -66,7 +66,7 @@ public class FlinkBatchTranslationContext {
 
     this.danglingDataSets = new HashMap<>();
   }
-  
+
   // ------------------------------------------------------------------------
 
   public Map<PValue, DataSet<?>> getDanglingDataSets() {
@@ -80,7 +80,7 @@ public class FlinkBatchTranslationContext {
   public PipelineOptions getPipelineOptions() {
     return options;
   }
-  
+
   @SuppressWarnings("unchecked")
   public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
     // assume that the DataSet is used as an input if retrieved here

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
index 4db929b..cba28e4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
@@ -39,7 +39,7 @@ public abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.D
   }
 
   /**
-   * Utility formatting method
+   * Utility formatting method.
    * @param n number of spaces to generate
    * @return String with "|" followed by n spaces
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 3719fa8..4b819b7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,9 +18,10 @@
 
 package org.apache.beam.runners.flink.translation;
 
-import com.google.api.client.util.Maps;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
@@ -63,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
index 71eb655..57b69aa 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
@@ -18,14 +18,14 @@
 package org.apache.beam.runners.flink.translation;
 
 /**
- * The translation mode of the Beam Pipeline
+ * The translation mode of the Beam Pipeline.
  */
 public enum TranslationMode {
 
-  /** Uses the batch mode of Flink */
+  /** Uses the batch mode of Flink. */
   BATCH,
 
-  /** Uses the streaming mode of Flink */
+  /** Uses the streaming mode of Flink. */
   STREAMING
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
new file mode 100644
index 0000000..9f11212
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.functions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
new file mode 100644
index 0000000..af4b354
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index 4eda357..e210ed9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.DataOutputView;
  * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}.
  */
 public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-  
+
   private Coder<T> coder;
 
   public CoderTypeSerializer(Coder<T> coder) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
index f3e667d..41db61e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
@@ -18,7 +18,9 @@
 package org.apache.beam.runners.flink.translation.types;
 
 import java.io.IOException;
+
 import org.apache.beam.sdk.coders.Coder;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -28,84 +30,84 @@ import org.apache.flink.core.memory.DataOutputView;
  */
 public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
 
-	private static final long serialVersionUID = 1L;
-
-	private static final byte[] EMPTY = new byte[0];
-
-	@Override
-	public boolean isImmutableType() {
-		return true;
-	}
-
-	@Override
-	public byte[] createInstance() {
-		return EMPTY;
-	}
-
-	@Override
-	public byte[] copy(byte[] from) {
-		return from;
-	}
-	
-	@Override
-	public byte[] copy(byte[] from, byte[] reuse) {
-		return copy(from);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-
-	@Override
-	public void serialize(byte[] record, DataOutputView target) throws IOException {
-		if (record == null) {
-			throw new IllegalArgumentException("The record must not be null.");
-		}
-		
-		final int len = record.length;
-		target.writeInt(len);
-		target.write(record);
-	}
-
-	@Override
-	public byte[] deserialize(DataInputView source) throws IOException {
-		final int len = source.readInt();
-		byte[] result = new byte[len];
-		source.readFully(result);
-		return result;
-	}
-	
-	@Override
-	public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
-		return deserialize(source);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		final int len = source.readInt();
-		target.writeInt(len);
-		target.write(source, len);
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof EncodedValueSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return this.getClass().hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		return obj instanceof EncodedValueSerializer;
-	}
-
-	@Override
-	public TypeSerializer<byte[]> duplicate() {
-		return this;
-	}
+  private static final long serialVersionUID = 1L;
+
+  private static final byte[] EMPTY = new byte[0];
+
+  @Override
+  public boolean isImmutableType() {
+    return true;
+  }
+
+  @Override
+  public byte[] createInstance() {
+    return EMPTY;
+  }
+
+  @Override
+  public byte[] copy(byte[] from) {
+    return from;
+  }
+
+  @Override
+  public byte[] copy(byte[] from, byte[] reuse) {
+    return copy(from);
+  }
+
+  @Override
+  public int getLength() {
+    return -1;
+  }
+
+
+  @Override
+  public void serialize(byte[] record, DataOutputView target) throws IOException {
+    if (record == null) {
+      throw new IllegalArgumentException("The record must not be null.");
+    }
+
+    final int len = record.length;
+    target.writeInt(len);
+    target.write(record);
+  }
+
+  @Override
+  public byte[] deserialize(DataInputView source) throws IOException {
+    final int len = source.readInt();
+    byte[] result = new byte[len];
+    source.readFully(result);
+    return result;
+  }
+
+  @Override
+  public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+    return deserialize(source);
+  }
+
+  @Override
+  public void copy(DataInputView source, DataOutputView target) throws IOException {
+    final int len = source.readInt();
+    target.writeInt(len);
+    target.write(source, len);
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof EncodedValueSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.getClass().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof EncodedValueSerializer;
+  }
+
+  @Override
+  public TypeSerializer<byte[]> duplicate() {
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
new file mode 100644
index 0000000..6fb3182
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.types;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 0c6cea8..fe2602b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -33,7 +33,7 @@ public class SerializedPipelineOptions implements Serializable {
 
   private final byte[] serializedOptions;
 
-  /** Lazily initialized copy of deserialized options */
+  /** Lazily initialized copy of deserialized options. */
   private transient PipelineOptions pipelineOptions;
 
   public SerializedPipelineOptions(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
new file mode 100644
index 0000000..5dedd53
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.utils;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
index 2cb9b18..f2d9db2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.DataOutputView;
  * {@link java.io.OutputStream}.
  */
 public class DataOutputViewWrapper extends OutputStream {
-  
+
   private DataOutputView outputView;
 
   public DataOutputViewWrapper(DataOutputView outputView) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
new file mode 100644
index 0000000..72f7deb
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3b0fccc..3b917e2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -27,6 +27,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -40,11 +43,8 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -79,8 +79,8 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
  *
  * @param <InputT> the input type of the {@link OldDoFn}
  * @param <FnOutputT> the output type of the {@link OldDoFn}
- * @param <OutputT> the output type of the operator, this can be different from the fn output type when we have
- *                 side outputs
+ * @param <OutputT> the output type of the operator, this can be different from the fn output
+ *                 type when we have side outputs
  */
 public class DoFnOperator<InputT, FnOutputT, OutputT>
     extends AbstractStreamOperator<OutputT>
@@ -166,7 +166,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     currentInputWatermark = Long.MIN_VALUE;
     currentOutputWatermark = currentInputWatermark;
 
-   	Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
+    Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
       @Override
       public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
           Class<?> fnClass,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
index 5751aac..6d2582b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -22,6 +22,11 @@ import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 
+/**
+ * Singleton keyed word item.
+ * @param <K>
+ * @param <ElemT>
+ */
 public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
 
   final K key;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index 5e583e9..37454a3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -35,7 +35,13 @@ import org.apache.beam.sdk.util.KeyedWorkItemCoder;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.WindowedValue;
 
-public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
+/**
+ * Singleton keyed word iteam coder.
+ * @param <K>
+ * @param <ElemT>
+ */
+public class SingletonKeyedWorkItemCoder<K, ElemT>
+    extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.
@@ -68,7 +74,7 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<Singlet
     this.keyCoder = keyCoder;
     this.elemCoder = elemCoder;
     this.windowCoder = windowCoder;
-    valueCoder= WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
+    valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
   }
 
   public Coder<K> getKeyCoder() {
@@ -80,7 +86,9 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<Singlet
   }
 
   @Override
-  public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream, Context context)
+  public void encode(SingletonKeyedWorkItem<K, ElemT> value,
+                     OutputStream outStream,
+                     Context context)
       throws CoderException, IOException {
     Context nestedContext = context.nested();
     keyCoder.encode(value.key(), outStream, nestedContext);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index b893116..29ae6ae 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -34,6 +34,7 @@ import java.util.Queue;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
index 51d9e0c..7829163 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
  * that all key comparisons/hashing happen on the encoded form.
  */
 public class WorkItemKeySelector<K, V>
-    implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>, ResultTypeQueryable<ByteBuffer> {
+    implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>,
+    ResultTypeQueryable<ByteBuffer> {
 
   private final Coder<K> keyCoder;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 2117e9d..5b01796 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -62,7 +62,8 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
       }
 
       @Override
-      public void finalize(Iterable<Object> writerResults, PipelineOptions options) throws Exception {
+      public void finalize(Iterable<Object> writerResults, PipelineOptions options)
+          throws Exception {
 
       }
 
@@ -70,12 +71,14 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
       public Coder<Object> getWriterResultCoder() {
         return new Coder<Object>() {
           @Override
-          public void encode(Object value, OutputStream outStream, Context context) throws CoderException, IOException {
+          public void encode(Object value, OutputStream outStream, Context context)
+              throws CoderException, IOException {
 
           }
 
           @Override
-          public Object decode(InputStream inStream, Context context) throws CoderException, IOException {
+          public Object decode(InputStream inStream, Context context)
+              throws CoderException, IOException {
             return null;
           }
 
@@ -110,7 +113,9 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
           }
 
           @Override
-          public void registerByteSizeObserver(Object value, ElementByteSizeObserver observer, Context context) throws Exception {
+          public void registerByteSizeObserver(Object value,
+                                               ElementByteSizeObserver observer,
+                                               Context context) throws Exception {
 
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index c6e0825..ac20c34 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -36,17 +36,19 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
 
   private final SourceFunction<T> flinkSource;
 
-  /** Coder set during translation */
+  /** Coder set during translation. */
   private Coder<T> coder;
 
-  /** Timestamp / watermark assigner for source; defaults to ingestion time */
-  private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = new IngestionTimeExtractor<T>();
+  /** Timestamp / watermark assigner for source; defaults to ingestion time. */
+  private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner =
+      new IngestionTimeExtractor<T>();
 
   public UnboundedFlinkSource(SourceFunction<T> source) {
     flinkSource = checkNotNull(source);
   }
 
-  public UnboundedFlinkSource(SourceFunction<T> source, AssignerWithPeriodicWatermarks<T> timestampAssigner) {
+  public UnboundedFlinkSource(SourceFunction<T> source,
+                              AssignerWithPeriodicWatermarks<T> timestampAssigner) {
     flinkSource = checkNotNull(source);
     flinkTimestampAssigner = checkNotNull(timestampAssigner);
   }
@@ -60,19 +62,25 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
   }
 
   @Override
-  public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
+  public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(
+      int desiredNumSplits,
+      PipelineOptions options) throws Exception {
+    throw new RuntimeException("Flink Sources are supported only when "
+        + "running with the FlinkRunner.");
   }
 
   @Override
-  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
+  public UnboundedReader<T> createReader(PipelineOptions options,
+                                         @Nullable CheckpointMark checkpointMark) {
+    throw new RuntimeException("Flink Sources are supported only when "
+        + "running with the FlinkRunner.");
   }
 
   @Nullable
   @Override
   public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
+    throw new RuntimeException("Flink Sources are supported only when "
+        + "running with the FlinkRunner.");
   }
 
 
@@ -100,7 +108,8 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
    * @param <T> The type that the source function produces.
    * @return The wrapped source function.
    */
-  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) {
+  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
+      SourceFunction<T> flinkSource) {
     return new UnboundedFlinkSource<>(flinkSource);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 8d37fe7..96b5138 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -38,9 +38,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging.
+ * An example unbounded Beam source that reads input from a socket.
+ * This is used mainly for testing and debugging.
  * */
-public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
+public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark>
+    extends UnboundedSource<String, CheckpointMarkT> {
 
   private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
 
@@ -60,7 +62,11 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
     this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
   }
 
-  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+  public UnboundedSocketSource(String hostname,
+                               int port,
+                               char delimiter,
+                               long maxNumRetries,
+                               long delayBetweenRetries) {
     this.hostname = hostname;
     this.port = port;
     this.delimiter = delimiter;
@@ -89,12 +95,15 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
   }
 
   @Override
-  public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-    return Collections.<UnboundedSource<String, C>>singletonList(this);
+  public List<? extends UnboundedSource<String, CheckpointMarkT>> generateInitialSplits(
+      int desiredNumSplits,
+      PipelineOptions options) throws Exception {
+    return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this);
   }
 
   @Override
-  public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+  public UnboundedReader<String> createReader(PipelineOptions options,
+                                              @Nullable CheckpointMarkT checkpointMark) {
     return new UnboundedSocketReader(this);
   }
 
@@ -109,7 +118,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
   @Override
   public void validate() {
     checkArgument(port > 0 && port < 65536, "port is out of range");
-    checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+    checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), "
+        + "or -1 (infinite retries)");
     checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
   }
 
@@ -118,7 +128,11 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
     return DEFAULT_SOCKET_CODER;
   }
 
-  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+  /**
+   * Unbounded socket reader.
+   */
+  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String>
+      implements Serializable {
 
     private static final long serialVersionUID = 7526472295622776147L;
     private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
@@ -138,7 +152,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
 
     private void openConnection() throws IOException {
       this.socket = new Socket();
-      this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+      this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()),
+          CONNECTION_TIMEOUT_TIME);
       this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
       this.isRunning = true;
     }
@@ -149,11 +164,14 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
       while (!isRunning) {
         try {
           openConnection();
-          LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+          LOG.info("Connected to server socket " + this.source.getHostname() + ':'
+              + this.source.getPort());
 
           return advance();
         } catch (IOException e) {
-          LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+          LOG.info("Lost connection to server socket " + this.source.getHostname() + ':'
+              + this.source.getPort() + ". Retrying in "
+              + this.source.getDelayBetweenRetries() + " msecs...");
 
           if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
             try {
@@ -167,7 +185,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
           }
         }
       }
-      LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+      LOG.error("Unable to connect to host " + this.source.getHostname()
+          + " : " + this.source.getPort());
       return false;
     }
 
@@ -211,7 +230,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
       this.reader.close();
       this.socket.close();
       this.isRunning = false;
-      LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+      LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":"
+          + this.source.getPort() + ".");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
new file mode 100644
index 0000000..b431ce7
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
new file mode 100644
index 0000000..0674871
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 32339dc..3c30fed 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -52,6 +52,9 @@ import org.junit.Test;
  */
 public class PipelineOptionsTest {
 
+  /**
+   * Pipeline options.
+   */
   public interface MyOptions extends FlinkPipelineOptions {
     @Description("Bla bla bla")
     @Default.String("Hello")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 0988146..37eedb2 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -118,7 +118,8 @@ public class WriteSinkITCase extends JavaProgramTestBase {
       }
 
       @Override
-      public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
+      public void finalize(Iterable<String> writerResults, PipelineOptions options)
+          throws Exception {
 
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
new file mode 100644
index 0000000..57f1e59
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 711ae00..628212a 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -56,7 +56,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for {@link FlinkStateInternals}. This is based on the tests for {@code InMemoryStateInternals}.
+ * Tests for {@link FlinkStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
  */
 @RunWith(JUnit4.class)
 public class FlinkStateInternalsTest {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index ab98c27..c6381ee 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -36,6 +36,9 @@ import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
+/**
+ * Test for GroupByNullKey.
+ */
 public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
 
 
@@ -58,6 +61,9 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
     compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
   }
 
+  /**
+   * DoFn extracting user and timestamp.
+   */
   public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> {
     private static final long serialVersionUID = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 64f978f..9410481 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -38,7 +38,7 @@ import org.joda.time.Instant;
 
 
 /**
- * Session window test
+ * Session window test.
  */
 public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
   protected String resultPath;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index a70ad49..73124a9 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -56,14 +56,14 @@ public class UnboundedSourceWrapperTest {
    */
   @Test
   public void testWithOneReader() throws Exception {
-    final int NUM_ELEMENTS = 20;
+    final int numElements = 20;
     final Object checkpointLock = new Object();
     PipelineOptions options = PipelineOptionsFactory.create();
 
     // this source will emit exactly NUM_ELEMENTS across all parallel readers,
     // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
     // elements later.
-    TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+    TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
         new UnboundedSourceWrapper<>(options, source, 1);
 
@@ -92,7 +92,7 @@ public class UnboundedSourceWrapperTest {
                 StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
 
               count++;
-              if (count >= NUM_ELEMENTS) {
+              if (count >= numElements) {
                 throw new SuccessException();
               }
             }
@@ -116,14 +116,14 @@ public class UnboundedSourceWrapperTest {
    */
   @Test
   public void testWithMultipleReaders() throws Exception {
-    final int NUM_ELEMENTS = 20;
+    final int numElements = 20;
     final Object checkpointLock = new Object();
     PipelineOptions options = PipelineOptionsFactory.create();
 
     // this source will emit exactly NUM_ELEMENTS across all parallel readers,
     // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
     // elements later.
-    TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+    TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
         new UnboundedSourceWrapper<>(options, source, 4);
 
@@ -149,10 +149,10 @@ public class UnboundedSourceWrapperTest {
 
             @Override
             public void collect(
-                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
 
               count++;
-              if (count >= NUM_ELEMENTS) {
+              if (count >= numElements) {
                 throw new SuccessException();
               }
             }
@@ -177,14 +177,14 @@ public class UnboundedSourceWrapperTest {
    */
   @Test
   public void testRestore() throws Exception {
-    final int NUM_ELEMENTS = 20;
+    final int numElements = 20;
     final Object checkpointLock = new Object();
     PipelineOptions options = PipelineOptionsFactory.create();
 
     // this source will emit exactly NUM_ELEMENTS across all parallel readers,
     // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
     // elements later.
-    TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+    TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
         new UnboundedSourceWrapper<>(options, source, 1);
 
@@ -213,11 +213,11 @@ public class UnboundedSourceWrapperTest {
 
             @Override
             public void collect(
-                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
 
               emittedElements.add(windowedValueStreamRecord.getValue().getValue());
               count++;
-              if (count >= NUM_ELEMENTS / 2) {
+              if (count >= numElements / 2) {
                 throw new SuccessException();
               }
             }
@@ -238,7 +238,7 @@ public class UnboundedSourceWrapperTest {
     byte[] snapshot = flinkWrapper.snapshotState(0, 0);
 
     // create a completely new source but restore from the snapshot
-    TestCountingSource restoredSource = new TestCountingSource(NUM_ELEMENTS);
+    TestCountingSource restoredSource = new TestCountingSource(numElements);
     UnboundedSourceWrapper<
         KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
         new UnboundedSourceWrapper<>(options, restoredSource, 1);
@@ -271,10 +271,10 @@ public class UnboundedSourceWrapperTest {
 
             @Override
             public void collect(
-                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
               emittedElements.add(windowedValueStreamRecord.getValue().getValue());
               count++;
-              if (count >= NUM_ELEMENTS / 2) {
+              if (count >= numElements / 2) {
                 throw new SuccessException();
               }
             }
@@ -292,7 +292,7 @@ public class UnboundedSourceWrapperTest {
     assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
 
     // verify that we saw all NUM_ELEMENTS elements
-    assertTrue(emittedElements.size() == NUM_ELEMENTS);
+    assertTrue(emittedElements.size() == numElements);
   }
 
   @SuppressWarnings("unchecked")
@@ -310,7 +310,8 @@ public class UnboundedSourceWrapperTest {
     when(mockTask.getConfiguration()).thenReturn(cfg);
     when(mockTask.getEnvironment()).thenReturn(env);
     when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-    when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+    when(mockTask.getAccumulatorMap())
+        .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
 
     operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
new file mode 100644
index 0000000..08a1e03
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.streaming;


Mime
View raw message