beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [beam] Diff for: [GitHub] reuvenlax merged pull request #7523: Apply spotless across Beam
Date Wed, 16 Jan 2019 02:09:46 GMT
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 4a516bb0547a..100869ae903c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -186,7 +186,8 @@ static void runWindowedWordCount(Options options) throws IOException {
         pipeline
             /* Read from the GCS file. */
             .apply(TextIO.read().from(options.getInputFile()))
-            // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
+            // Concept #2: Add an element timestamp, using an artificial time just to show
+            // windowing.
             // See AddTimestampFn for more detail on this.
             .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 6bcae8da28f3..14a11b71b5a8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -224,7 +224,8 @@ private void tearDown() {
             Transport.getJsonFactory(),
             chainHttpRequestInitializer(
                 options.getGcpCredential(),
-                // Do not log 404. It clutters the output and is possibly even required by the caller.
+                // Do not log 404. It clutters the output and is possibly even required by the
+                // caller.
                 new RetryHttpRequestInitializer(ImmutableList.of(404))))
         .setApplicationName(options.getAppName())
         .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
@@ -237,7 +238,8 @@ private void tearDown() {
             Transport.getJsonFactory(),
             chainHttpRequestInitializer(
                 options.getGcpCredential(),
-                // Do not log 404. It clutters the output and is possibly even required by the caller.
+                // Do not log 404. It clutters the output and is possibly even required by the
+                // caller.
                 new RetryHttpRequestInitializer(ImmutableList.of(404))))
         .setRootUrl(options.getPubsubRootUrl())
         .setApplicationName(options.getAppName())
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 9a3bfc26d5b6..bc6541341b8b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -122,7 +122,7 @@
  * and then exits.
  */
 public class TriggerExample {
-  //Numeric value of fixed window duration, in minutes
+  // Numeric value of fixed window duration, in minutes
   public static final int WINDOW_DURATION = 30;
   // Constants used in triggers.
   // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
@@ -189,18 +189,22 @@
               .apply(
                   "Default",
                   Window
-                      // The default window duration values work well if you're running the default input
+                      // The default window duration values work well if you're running the default
+                      // input
                       // file. You may want to adjust the window duration otherwise.
                       .<KV<String, Integer>>into(
                           FixedWindows.of(Duration.standardMinutes(windowDuration)))
-                      // The default trigger first emits output when the system's watermark passes the end
+                      // The default trigger first emits output when the system's watermark passes
+                      // the end
                       // of the window.
                       .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                       // Late data is dropped
                       .withAllowedLateness(Duration.ZERO)
                       // Discard elements after emitting each pane.
-                      // With no allowed lateness and the specified trigger there will only be a single
-                      // pane, so this doesn't have a noticeable effect. See concept 2 for more details.
+                      // With no allowed lateness and the specified trigger there will only be a
+                      // single
+                      // pane, so this doesn't have a noticeable effect. See concept 2 for more
+                      // details.
                       .discardingFiredPanes())
               .apply(new TotalFlow("default"));
 
@@ -229,7 +233,8 @@
                           FixedWindows.of(Duration.standardMinutes(windowDuration)))
                       // Late data is emitted as it arrives
                       .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-                      // Once the output is produced, the pane is dropped and we start preparing the next
+                      // Once the output is produced, the pane is dropped and we start preparing the
+                      // next
                       // pane for the window
                       .discardingFiredPanes()
                       // Late data is handled up to one day
@@ -264,8 +269,10 @@
                               AfterProcessingTime.pastFirstElementInPane()
                                   // Speculative every ONE_MINUTE
                                   .plusDelayOf(ONE_MINUTE)))
-                      // After emitting each pane, it will continue accumulating the elements so that each
-                      // approximation includes all of the previous data in addition to the newly arrived
+                      // After emitting each pane, it will continue accumulating the elements so
+                      // that each
+                      // approximation includes all of the previous data in addition to the newly
+                      // arrived
                       // data.
                       .accumulatingFiredPanes()
                       .withAllowedLateness(ONE_DAY))
@@ -414,7 +421,7 @@ public void processElement(ProcessContext c) throws Exception {
         return;
       }
       if (laneInfo.length < VALID_NUM_FIELDS) {
-        //Skip the invalid input.
+        // Skip the invalid input.
         return;
       }
       String freeway = laneInfo[2];
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 6dd112f5e2d7..962d5fae134e 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -129,14 +129,16 @@ public void testTeamScoresSpeculative() {
             .addElements(
                 event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
                 event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)))
-            // Some time passes within the runner, which causes a speculative pane containing the blue
+            // Some time passes within the runner, which causes a speculative pane containing the
+            // blue
             // team's score to be emitted
             .advanceProcessingTime(Duration.standardMinutes(10))
             .addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3)))
             // Some additional time passes and we get a speculative pane for the red team
             .advanceProcessingTime(Duration.standardMinutes(12))
             .addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22)))
-            // More time passes and a speculative pane containing a refined value for the blue pane is
+            // More time passes and a speculative pane containing a refined value for the blue pane
+            // is
             // emitted
             .advanceProcessingTime(Duration.standardMinutes(10))
             // Some more events occur
@@ -238,7 +240,8 @@ public void testTeamScoresObservablyLate() {
                 event(TestUser.RED_TWO, 2, Duration.ZERO),
                 event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
                 event(TestUser.RED_TWO, 3, Duration.standardMinutes(3)))
-            // A late refinement is emitted due to the advance in processing time, but the window has
+            // A late refinement is emitted due to the advance in processing time, but the window
+            // has
             // not yet closed because the watermark has not advanced
             .advanceProcessingTime(Duration.standardMinutes(12))
             // These elements should appear in the final pane
@@ -303,7 +306,8 @@ public void testTeamScoresDroppablyLate() {
                     .plus(ALLOWED_LATENESS)
                     .plus(TEAM_WINDOW_DURATION)
                     .plus(Duration.standardMinutes(1)))
-            // These elements within the expired window are droppably late, and will not appear in the
+            // These elements within the expired window are droppably late, and will not appear in
+            // the
             // output
             .addElements(
                 event(
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index bd10307f7e3d..c53f48f0ff93 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -33,7 +33,7 @@
 
   private TestApexRunner(ApexPipelineOptions options) {
     options.setEmbeddedExecution(true);
-    //options.setEmbeddedExecutionDebugMode(false);
+    // options.setEmbeddedExecutionDebugMode(false);
     this.delegate = ApexRunner.fromOptions(options);
   }
 
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index ca1c7ffa24de..eb625e4acc15 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -77,9 +77,7 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
     Map<TupleTag<?>, Coder<?>> outputCoders =
-        outputs
-            .entrySet()
-            .stream()
+        outputs.entrySet().stream()
             .filter(e -> e.getValue() instanceof PCollection)
             .collect(
                 Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
@@ -138,9 +136,7 @@ public void translate(
       List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
       Map<TupleTag<?>, Coder<?>> outputCoders =
-          outputs
-              .entrySet()
-              .stream()
+          outputs.entrySet().stream()
               .filter(e -> e.getValue() instanceof PCollection)
               .collect(
                   Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
@@ -221,8 +217,8 @@ static void addSideInputs(
           .getWindowingStrategy()
           .equals(firstSideInput.getWindowingStrategy())) {
         // TODO: check how to handle this in stream codec
-        //String msg = "Multiple side inputs with different window strategies.";
-        //throw new UnsupportedOperationException(msg);
+        // String msg = "Multiple side inputs with different window strategies.";
+        // throw new UnsupportedOperationException(msg);
         LOG.warn(
             "Side inputs union with different windowing strategies {} {}",
             firstSideInput.getWindowingStrategy(),
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 7a5fbaa8ec86..618b7a79e2ee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -382,7 +382,7 @@ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
     checkState(
         minEventTimeTimer >= currentInputWatermark,
         "Event time timer processing generates new timer(s) behind watermark.");
-    //LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
+    // LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
     //    currentInputWatermark);
 
     // TODO: is this the right way to trigger processing time timers?
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
index 5746dcab4e5d..7df1e2bcf483 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -56,8 +56,8 @@ public void testGetYarnDeployDependencies() throws Exception {
     List<File> deps = ApexYarnLauncher.getYarnDeployDependencies();
     String depsToString = deps.toString();
     // the beam dependencies are not present as jar when running within the Maven build reactor
-    //assertThat(depsToString, containsString("beam-runners-core-"));
-    //assertThat(depsToString, containsString("beam-runners-apex-"));
+    // assertThat(depsToString, containsString("beam-runners-core-"));
+    // assertThat(depsToString, containsString("beam-runners-apex-"));
     assertThat(depsToString, containsString("apex-common-"));
     assertThat(depsToString, not(containsString("hadoop-")));
     assertThat(depsToString, not(containsString("zookeeper-")));
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 025c7d642cec..70c92762018b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -92,7 +92,8 @@ public FunctionSpec translate(
   /** Produces a {@link RunnerApi.CombinePayload} from a {@link Combine}. */
   static <K, InputT, OutputT> CombinePayload payloadForCombine(
       final AppliedPTransform<
-              PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+              PCollection<KV<K, InputT>>,
+              PCollection<KV<K, OutputT>>,
               Combine.PerKey<K, InputT, OutputT>>
           combine,
       final SdkComponents components)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index f812de1dbabc..daf9c48dff53 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -53,7 +53,8 @@
   @Deprecated
   public static <ElemT, ViewT> PCollectionView<ViewT> getView(
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>,
+              PCollection<ElemT>,
+              PCollection<ElemT>,
               PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           application)
       throws IOException {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
index fb04428fd03e..bfa926d1405b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
@@ -112,10 +112,7 @@ public static String generateNameFromTransformNames(
       } else {
         // Enumerate the outer stages with their composite structure, if any.
         parts =
-            groupByOuter
-                .asMap()
-                .entrySet()
-                .stream()
+            groupByOuter.asMap().entrySet().stream()
                 .map(
                     outer ->
                         String.format(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fb4948b5f246..463e51dcbaea 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -301,10 +301,7 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> ap
   }
 
   public static Map<TupleTag<?>, Coder<?>> getOutputCoders(AppliedPTransform<?, ?, ?> application) {
-    return application
-        .getOutputs()
-        .entrySet()
-        .stream()
+    return application.getOutputs().entrySet().stream()
         .filter(e -> e.getValue() instanceof PCollection)
         .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
   }
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
index b4a917ca3dce..23a23d16b7dc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
@@ -100,7 +100,7 @@ public static PipelineOptions fromJson(String optionsJson) {
       Map<String, Object> probingOptionsMap =
           MAPPER.readValue(optionsJson, new TypeReference<Map<String, Object>>() {});
       if (probingOptionsMap.containsKey("options")) {
-        //Legacy options.
+        // Legacy options.
         return MAPPER.readValue(optionsJson, PipelineOptions.class);
       } else {
         // Fn Options with namespace and version.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
index 94f8c3ba1902..a7a914c54a17 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
@@ -80,8 +80,7 @@
    */
   public static List<String> prepareFilesForStaging(
       List<String> resourcesToStage, String tmpJarLocation) {
-    return resourcesToStage
-        .stream()
+    return resourcesToStage.stream()
         .map(File::new)
         .filter(File::exists)
         .map(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index afd3d7012a38..8863c66cfe2e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -152,9 +152,7 @@ public void visitPrimitiveTransform(Node node) {
               // throws UnsupportedOperationException.
               transformBuilder.clearSubtransforms();
               transformBuilder.addAllSubtransforms(
-                  transform
-                      .getSubtransformsList()
-                      .stream()
+                  transform.getSubtransformsList().stream()
                       .filter(id -> !viewTransforms.contains(id))
                       .collect(Collectors.toList()));
               newTransforms.put(transformId, transformBuilder.build());
@@ -168,9 +166,7 @@ public void visitPrimitiveTransform(Node node) {
     viewOutputsToInputs.keySet().forEach(newPipeline.getComponentsBuilder()::removePcollections);
     newPipeline.clearRootTransformIds();
     newPipeline.addAllRootTransformIds(
-        pipeline
-            .getRootTransformIdsList()
-            .stream()
+        pipeline.getRootTransformIdsList().stream()
             .filter(id -> !viewTransforms.contains(id))
             .collect(Collectors.toList()));
     return newPipeline.build();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 3197c694b62d..67b788764521 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -60,14 +60,16 @@
   /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessNaive}. */
   public static class OverrideFactory<InputT, OutputT, RestrictionT>
       implements PTransformOverrideFactory<
-          PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+          PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+          PCollectionTuple,
           ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
     @Override
     public PTransformReplacement<
             PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+                    PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+                    PCollectionTuple,
                     ProcessKeyedElements<InputT, OutputT, RestrictionT>>
                 transform) {
       checkArgument(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index c40b4ce005bb..e42487ce692c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -132,8 +132,7 @@ public void validate() {
           return ImmutableList.of(this);
         }
         List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options);
-        return splits
-            .stream()
+        return splits.stream()
             .map(input -> new BoundedToUnboundedSourceAdapter<>(input))
             .collect(Collectors.toList());
       } catch (Exception e) {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 4eb6bb0949e5..817f2d23c1bf 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -130,7 +130,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
 
   public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
       AppliedPTransform<
-              PCollection<UserT>, WriteFilesResult<DestinationT>,
+              PCollection<UserT>,
+              WriteFilesResult<DestinationT>,
               ? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
           transform)
       throws IOException {
@@ -140,7 +141,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
 
   public static <UserT, DestinationT> List<PCollectionView<?>> getDynamicDestinationSideInputs(
       AppliedPTransform<
-              PCollection<UserT>, WriteFilesResult<DestinationT>,
+              PCollection<UserT>,
+              WriteFilesResult<DestinationT>,
               ? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
           transform)
       throws IOException {
@@ -167,7 +169,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
 
   public static <T, DestinationT> boolean isWindowedWrites(
       AppliedPTransform<
-              PCollection<T>, WriteFilesResult<DestinationT>,
+              PCollection<T>,
+              WriteFilesResult<DestinationT>,
               ? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
           transform)
       throws IOException {
@@ -176,7 +179,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
 
   public static <T, DestinationT> boolean isRunnerDeterminedSharding(
       AppliedPTransform<
-              PCollection<T>, WriteFilesResult<DestinationT>,
+              PCollection<T>,
+              WriteFilesResult<DestinationT>,
               ? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
           transform)
       throws IOException {
@@ -185,7 +189,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
 
   private static <T, DestinationT> WriteFilesPayload getWriteFilesPayload(
       AppliedPTransform<
-              PCollection<T>, WriteFilesResult<DestinationT>,
+              PCollection<T>,
+              WriteFilesResult<DestinationT>,
               ? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
           transform)
       throws IOException {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index db8c63463568..8bdd71890574 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -183,8 +183,7 @@ default PTransform toPTransform(String uniqueName) {
             .toBuilder()
             .clearTransforms()
             .putAllTransforms(
-                getTransforms()
-                    .stream()
+                getTransforms().stream()
                     .collect(
                         Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))));
 
@@ -214,33 +213,23 @@ static ExecutableStage fromPayload(ExecutableStagePayload payload) {
         PipelineNode.pCollection(
             payload.getInput(), components.getPcollectionsOrThrow(payload.getInput()));
     List<SideInputReference> sideInputs =
-        payload
-            .getSideInputsList()
-            .stream()
+        payload.getSideInputsList().stream()
             .map(sideInputId -> SideInputReference.fromSideInputId(sideInputId, components))
             .collect(Collectors.toList());
     List<UserStateReference> userStates =
-        payload
-            .getUserStatesList()
-            .stream()
+        payload.getUserStatesList().stream()
             .map(userStateId -> UserStateReference.fromUserStateId(userStateId, components))
             .collect(Collectors.toList());
     List<TimerReference> timers =
-        payload
-            .getTimersList()
-            .stream()
+        payload.getTimersList().stream()
             .map(timerId -> TimerReference.fromTimerId(timerId, components))
             .collect(Collectors.toList());
     List<PTransformNode> transforms =
-        payload
-            .getTransformsList()
-            .stream()
+        payload.getTransformsList().stream()
             .map(id -> PipelineNode.pTransform(id, components.getTransformsOrThrow(id)))
             .collect(Collectors.toList());
     List<PCollectionNode> outputs =
-        payload
-            .getOutputsList()
-            .stream()
+        payload.getOutputsList().stream()
             .map(id -> PipelineNode.pCollection(id, components.getPcollectionsOrThrow(id)))
             .collect(Collectors.toList());
     return ImmutableExecutableStage.of(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index facac67d57a7..e0d68289dd49 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -65,8 +65,7 @@ static FusedPipeline of(
     Set<String> executableTransformIds =
         Sets.union(
             executableStageTransforms.keySet(),
-            getRunnerExecutedTransforms()
-                .stream()
+            getRunnerExecutedTransforms().stream()
                 .map(PTransformNode::getId)
                 .collect(Collectors.toSet()));
 
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index b52c83c5d8c8..e1c50916b705 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -214,9 +214,11 @@ private static boolean parDoCompatibility(
         // upstream of any of the side inputs.
         || (pipeline.getSideInputs(parDo).isEmpty()
             // We purposefully break fusion here to provide runners the opportunity to insert a
-            // grouping operation to simplify implementing support for ParDo's that contain user state.
+            // grouping operation to simplify implementing support for ParDo's that contain user
+            // state.
             // We would not need to do this if we had the ability to mark upstream transforms as
-            // key preserving or if runners could execute ParDos containing user state in a distributed
+            // key preserving or if runners could execute ParDos containing user state in a
+            // distributed
             // fashion for a single key.
             && pipeline.getUserStates(parDo).isEmpty()
             // We purposefully break fusion here to provide runners the opportunity to insert a
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 640d402214d2..34c57e33af01 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -165,15 +165,13 @@ private FusedPipeline fusePipeline(
     // as can compatible producers/consumers if a PCollection is only materialized once.
     return FusedPipeline.of(
         deduplicated.getDeduplicatedComponents(),
-        stages
-            .stream()
+        stages.stream()
             .map(stage -> deduplicated.getDeduplicatedStages().getOrDefault(stage, stage))
             .map(GreedyPipelineFuser::sanitizeDanglingPTransformInputs)
             .collect(Collectors.toSet()),
         Sets.union(
             deduplicated.getIntroducedTransforms(),
-            unfusedTransforms
-                .stream()
+            unfusedTransforms.stream()
                 .map(
                     transform ->
                         deduplicated
@@ -306,8 +304,7 @@ static DescendantConsumers of(
               pipeline.getEnvironment(newConsumer.consumingTransform()).get());
       boolean foundSiblings = false;
       for (Set<CollectionConsumer> existingConsumers : compatibleConsumers.get(key)) {
-        if (existingConsumers
-            .stream()
+        if (existingConsumers.stream()
             .allMatch(
                 // The two consume the same PCollection and can exist in the same stage.
                 collectionConsumer ->
@@ -340,8 +337,7 @@ private ExecutableStage fuseSiblings(Set<CollectionConsumer> mutuallyCompatible)
     return GreedyStageFuser.forGrpcPortRead(
         pipeline,
         rootCollection,
-        mutuallyCompatible
-            .stream()
+        mutuallyCompatible.stream()
             .map(CollectionConsumer::consumingTransform)
             .collect(Collectors.toSet()));
   }
@@ -359,27 +355,19 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage
     Set<String> possibleInputs = new HashSet<>();
     possibleInputs.add(stage.getInputPCollection().getId());
     possibleInputs.addAll(
-        stage
-            .getOutputPCollections()
-            .stream()
+        stage.getOutputPCollections().stream()
             .map(PCollectionNode::getId)
             .collect(Collectors.toSet()));
     possibleInputs.addAll(
-        stage
-            .getSideInputs()
-            .stream()
+        stage.getSideInputs().stream()
             .map(s -> s.collection().getId())
             .collect(Collectors.toSet()));
     possibleInputs.addAll(
-        stage
-            .getTransforms()
-            .stream()
+        stage.getTransforms().stream()
             .flatMap(t -> t.getTransform().getOutputsMap().values().stream())
             .collect(Collectors.toSet()));
     Set<String> danglingInputs =
-        stage
-            .getTransforms()
-            .stream()
+        stage.getTransforms().stream()
             .flatMap(t -> t.getTransform().getInputsMap().values().stream())
             .filter(in -> !possibleInputs.contains(in))
             .collect(Collectors.toSet());
@@ -388,10 +376,7 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage
     for (PTransformNode transformNode : stage.getTransforms()) {
       PTransform transform = transformNode.getTransform();
       Map<String, String> validInputs =
-          transform
-              .getInputsMap()
-              .entrySet()
-              .stream()
+          transform.getInputsMap().entrySet().stream()
               .filter(e -> !danglingInputs.contains(e.getValue()))
               .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
 
@@ -411,15 +396,10 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage
     componentBuilder
         .clearTransforms()
         .putAllTransforms(
-            pTransformNodes
-                .stream()
+            pTransformNodes.stream()
                 .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)));
     Map<String, PCollection> validPCollectionMap =
-        stage
-            .getComponents()
-            .getPcollectionsMap()
-            .entrySet()
-            .stream()
+        stage.getComponents().getPcollectionsMap().entrySet().stream()
             .filter(e -> !danglingInputs.contains(e.getKey()))
             .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
 
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
index 1f1c3832725c..9546d973c1ba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
@@ -43,8 +43,7 @@ public static ImmutableExecutableStage ofFullComponents(
             .toBuilder()
             .clearTransforms()
             .putAllTransforms(
-                transforms
-                    .stream()
+                transforms.stream()
                     .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)))
             .build();
     return of(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
index b2d277e4a462..c179372c05bb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
@@ -208,11 +208,14 @@ public final NodeT apply(NodeT input) {
     //
     // The only edges that are ignored by the algorithm are back edges.
     // The algorithm (while there are still nodes in the graph):
-    //   1) Removes all sinks from the graph adding them to the beginning of "s2". Continue to do this till there
+    //   1) Removes all sinks from the graph adding them to the beginning of "s2". Continue to do
+    // this till there
     //      are no more sinks.
-    //   2) Removes all source from the graph adding them to the end of "s1". Continue to do this till there
+    //   2) Removes all source from the graph adding them to the end of "s1". Continue to do this
+    // till there
     //      are no more sources.
-    //   3) Remote a single node with the highest delta within the graph and add it to the end of "s1".
+    //   3) Remote a single node with the highest delta within the graph and add it to the end of
+    // "s1".
     //
     // The topological order is then the s1 concatenated with s2.
 
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
index 5a481901c8f3..f64d94d6f5a0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
@@ -293,13 +293,10 @@ private static ExecutableStage deduplicateStageOutput(
             .toBuilder()
             .clearTransforms()
             .putAllTransforms(
-                updatedTransforms
-                    .stream()
+                updatedTransforms.stream()
                     .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)))
             .putAllPcollections(
-                originalToPartial
-                    .values()
-                    .stream()
+                originalToPartial.values().stream()
                     .collect(
                         Collectors.toMap(PCollectionNode::getId, PCollectionNode::getPCollection)))
             .build();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 710fbe2e4570..c4b12d75ed0a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -132,11 +132,15 @@ private QueryablePipeline(Collection<String> transformIds, Components components
       PTransform transform = transformEntry.getValue();
       boolean isPrimitive = isPrimitiveTransform(transform);
       if (isPrimitive) {
-        // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested descendents), due to runners
-        // either rewriting them in terms of runner-specific transforms, or SDKs constructing them in terms of other
+        // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested
+        // descendents), due to runners
+        // either rewriting them in terms of runner-specific transforms, or SDKs constructing them
+        // in terms of other
         // underlying transforms (see https://issues.apache.org/jira/browse/BEAM-5441).
-        // We consider any "leaf" descendents of these "primitive" transforms to be the true "primitives" that we
-        // preserve here; in the common case, this is just the "primitive" itself, which has no descendents).
+        // We consider any "leaf" descendents of these "primitive" transforms to be the true
+        // "primitives" that we
+        // preserve here; in the common case, this is just the "primitive" itself, which has no
+        // descendents).
         Deque<String> transforms = new ArrayDeque<>();
         transforms.push(transformEntry.getKey());
         while (!transforms.isEmpty()) {
@@ -229,9 +233,7 @@ private static boolean isPrimitiveTransform(PTransform transform) {
   }
 
   public Collection<PTransformNode> getTransforms() {
-    return pipelineNetwork
-        .nodes()
-        .stream()
+    return pipelineNetwork.nodes().stream()
         .filter(PTransformNode.class::isInstance)
         .map(PTransformNode.class::cast)
         .collect(Collectors.toList());
@@ -252,9 +254,7 @@ private static boolean isPrimitiveTransform(PTransform transform) {
    * have no input {@link PCollection}.
    */
   public Set<PTransformNode> getRootTransforms() {
-    return pipelineNetwork
-        .nodes()
-        .stream()
+    return pipelineNetwork.nodes().stream()
         .filter(pipelineNode -> pipelineNetwork.inEdges(pipelineNode).isEmpty())
         .map(pipelineNode -> (PTransformNode) pipelineNode)
         .collect(Collectors.toSet());
@@ -276,14 +276,10 @@ public PTransformNode getProducer(PCollectionNode pcollection) {
    * does consume the {@link PCollectionNode} on a per-element basis.
    */
   public Set<PTransformNode> getPerElementConsumers(PCollectionNode pCollection) {
-    return pipelineNetwork
-        .successors(pCollection)
-        .stream()
+    return pipelineNetwork.successors(pCollection).stream()
         .filter(
             consumer ->
-                pipelineNetwork
-                    .edgesConnecting(pCollection, consumer)
-                    .stream()
+                pipelineNetwork.edgesConnecting(pCollection, consumer).stream()
                     .anyMatch(PipelineEdge::isPerElement))
         .map(pipelineNode -> (PTransformNode) pipelineNode)
         .collect(Collectors.toSet());
@@ -294,14 +290,10 @@ public PTransformNode getProducer(PCollectionNode pcollection) {
    * the collection as a singleton.
    */
   public Set<PTransformNode> getSingletonConsumers(PCollectionNode pCollection) {
-    return pipelineNetwork
-        .successors(pCollection)
-        .stream()
+    return pipelineNetwork.successors(pCollection).stream()
         .filter(
             consumer ->
-                pipelineNetwork
-                    .edgesConnecting(pCollection, consumer)
-                    .stream()
+                pipelineNetwork.edgesConnecting(pCollection, consumer).stream()
                     .anyMatch(edge -> !edge.isPerElement()))
         .map(pipelineNode -> (PTransformNode) pipelineNode)
         .collect(Collectors.toSet());
@@ -312,18 +304,14 @@ public PTransformNode getProducer(PCollectionNode pcollection) {
    * per-element basis.
    */
   public Set<PCollectionNode> getPerElementInputPCollections(PTransformNode ptransform) {
-    return pipelineNetwork
-        .inEdges(ptransform)
-        .stream()
+    return pipelineNetwork.inEdges(ptransform).stream()
         .filter(PipelineEdge::isPerElement)
         .map(edge -> (PCollectionNode) pipelineNetwork.incidentNodes(edge).source())
         .collect(Collectors.toSet());
   }
 
   public Set<PCollectionNode> getOutputPCollections(PTransformNode ptransform) {
-    return pipelineNetwork
-        .successors(ptransform)
-        .stream()
+    return pipelineNetwork.successors(ptransform).stream()
         .map(pipelineNode -> (PCollectionNode) pipelineNode)
         .collect(Collectors.toSet());
   }
@@ -337,8 +325,7 @@ public Components getComponents() {
    * as side inputs.
    */
   public Collection<SideInputReference> getSideInputs(PTransformNode transform) {
-    return getLocalSideInputNames(transform.getTransform())
-        .stream()
+    return getLocalSideInputNames(transform.getTransform()).stream()
         .map(
             localName -> {
               String transformId = transform.getId();
@@ -354,8 +341,7 @@ public Components getComponents() {
   }
 
   public Collection<UserStateReference> getUserStates(PTransformNode transform) {
-    return getLocalUserStateNames(transform.getTransform())
-        .stream()
+    return getLocalUserStateNames(transform.getTransform()).stream()
         .map(
             localName -> {
               String transformId = transform.getId();
@@ -382,8 +368,7 @@ public Components getComponents() {
   }
 
   public Collection<TimerReference> getTimers(PTransformNode transform) {
-    return getLocalTimerNames(transform.getTransform())
-        .stream()
+    return getLocalTimerNames(transform.getTransform()).stream()
         .map(
             localName -> {
               String transformId = transform.getId();
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
index 60525e991f87..ed1286ce6d35 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
@@ -69,9 +69,7 @@ public void processElement(
                 }));
 
     ExecutableStage firstEnvStage =
-        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
-            .getFusedStages()
-            .stream()
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream()
             .findFirst()
             .get();
     RunnerApi.ExecutableStagePayload basePayload =
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
index 25e7253a671d..13a954913a81 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -51,13 +51,15 @@
           PCollection<? extends Integer>, PCollection<Integer>, MapElements<Integer, Integer>>
       factory =
           new SingleInputOutputOverrideFactory<
-              PCollection<? extends Integer>, PCollection<Integer>,
+              PCollection<? extends Integer>,
+              PCollection<Integer>,
               MapElements<Integer, Integer>>() {
             @Override
             public PTransformReplacement<PCollection<? extends Integer>, PCollection<Integer>>
                 getReplacementTransform(
                     AppliedPTransform<
-                            PCollection<? extends Integer>, PCollection<Integer>,
+                            PCollection<? extends Integer>,
+                            PCollection<Integer>,
                             MapElements<Integer, Integer>>
                         transform) {
               return PTransformReplacement.of(
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
index 1535d03e9e0f..3c477184717a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
@@ -105,8 +105,7 @@ protected boolean matchesSafely(ExecutableStage item) {
     return item.getInputPCollection().getId().equals(inputPCollectionId)
         && containsInAnyOrder(sideInputIds.toArray())
             .matches(
-                item.getSideInputs()
-                    .stream()
+                item.getSideInputs().stream()
                     .map(
                         ref ->
                             SideInputId.newBuilder()
@@ -115,14 +114,12 @@ protected boolean matchesSafely(ExecutableStage item) {
                                 .build())
                     .collect(Collectors.toSet()))
         && materializedPCollection.matches(
-            item.getOutputPCollections()
-                .stream()
+            item.getOutputPCollections().stream()
                 .map(PCollectionNode::getId)
                 .collect(Collectors.toSet()))
         && containsInAnyOrder(fusedTransforms.toArray(new String[0]))
             .matches(
-                item.getTransforms()
-                    .stream()
+                item.getTransforms().stream()
                     .map(PTransformNode::getId)
                     .collect(Collectors.toSet()));
   }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 5e681f2bb7cd..e329c00c2cf0 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -536,9 +536,7 @@ public void flattenWithHeterogenousInputsAndOutputsEntirelyMaterialized() {
                 .withNoOutputs()
                 .withTransforms("pyParDo")));
     Set<String> materializedStageOutputs =
-        fused
-            .getFusedStages()
-            .stream()
+        fused.getFusedStages().stream()
             .flatMap(executableStage -> executableStage.getOutputPCollections().stream())
             .map(PCollectionNode::getId)
             .collect(Collectors.toSet());
@@ -1316,16 +1314,11 @@ public void sanitizedTransforms() throws Exception {
             ExecutableStageMatcher.withInput(impulse2Output.getUniqueName())
                 .withTransforms(flattenTransform.getUniqueName(), read2Transform.getUniqueName())));
     assertThat(
-        fused
-            .getFusedStages()
-            .stream()
+        fused.getFusedStages().stream()
             .flatMap(
                 s ->
-                    s.getComponents()
-                        .getTransformsOrThrow(flattenTransform.getUniqueName())
-                        .getInputsMap()
-                        .values()
-                        .stream())
+                    s.getComponents().getTransformsOrThrow(flattenTransform.getUniqueName())
+                        .getInputsMap().values().stream())
             .collect(Collectors.toList()),
         containsInAnyOrder(read1Output.getUniqueName(), read2Output.getUniqueName()));
   }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 1e20c73feb37..94ad3e8abd68 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -1165,9 +1165,7 @@ public void materializesWithGroupByKeyConsumer() {
       protected boolean matchesSafely(ExecutableStage executableStage) {
         // NOTE: Transform names must be unique, so it's fine to throw here if this does not hold.
         Set<String> stageTransforms =
-            executableStage
-                .getTransforms()
-                .stream()
+            executableStage.getTransforms().stream()
                 .map(PTransformNode::getId)
                 .collect(Collectors.toSet());
         return stageTransforms.containsAll(expectedTransforms)
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java
index 065ad4e71361..a2a9991e2d47 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java
@@ -224,18 +224,14 @@ public void testNodeReplacement() {
     MutableNetwork<String, String> originalNetwork = createNetwork();
     for (String node : originalNetwork.nodes()) {
       assertEquals(
-          originalNetwork
-              .successors(node)
-              .stream()
+          originalNetwork.successors(node).stream()
               .map(function)
               .collect(Collectors.toCollection(HashSet::new)),
           network.successors(function.apply(node)));
     }
     assertEquals(
         network.nodes(),
-        originalNetwork
-            .nodes()
-            .stream()
+        originalNetwork.nodes().stream()
             .map(function)
             .collect(Collectors.toCollection(HashSet::new)));
   }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
index 624eff331402..023cc27cc7a6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
@@ -271,10 +271,7 @@ public void duplicateOverStages() {
 
     assertThat(result.getDeduplicatedStages().keySet(), hasSize(2));
     List<String> stageOutputs =
-        result
-            .getDeduplicatedStages()
-            .values()
-            .stream()
+        result.getDeduplicatedStages().values().stream()
             .flatMap(stage -> stage.getOutputPCollections().stream().map(PCollectionNode::getId))
             .collect(Collectors.toList());
     assertThat(
@@ -398,11 +395,7 @@ public void duplicateOverStagesAndTransforms() {
     introducedOutputs.addAll(
         result.getDeduplicatedTransforms().get("shared").getTransform().getOutputsMap().values());
     introducedOutputs.addAll(
-        result
-            .getDeduplicatedStages()
-            .get(oneStage)
-            .getOutputPCollections()
-            .stream()
+        result.getDeduplicatedStages().get(oneStage).getOutputPCollections().stream()
             .map(PCollectionNode::getId)
             .collect(Collectors.toList()));
     assertThat(
@@ -588,16 +581,11 @@ public void multipleDuplicatesInStages() {
     assertThat(result.getDeduplicatedTransforms().keySet(), empty());
 
     Collection<String> introducedIds =
-        result
-            .getIntroducedTransforms()
-            .stream()
+        result.getIntroducedTransforms().stream()
             .flatMap(pt -> pt.getTransform().getInputsMap().values().stream())
             .collect(Collectors.toList());
     String[] stageOutputs =
-        result
-            .getDeduplicatedStages()
-            .values()
-            .stream()
+        result.getDeduplicatedStages().values().stream()
             .flatMap(s -> s.getOutputPCollections().stream().map(PCollectionNode::getId))
             .toArray(String[]::new);
     assertThat(introducedIds, containsInAnyOrder(stageOutputs));
@@ -608,9 +596,7 @@ public void multipleDuplicatesInStages() {
     assertThat(
         result.getDeduplicatedComponents().getTransformsMap().entrySet(),
         hasItems(
-            result
-                .getIntroducedTransforms()
-                .stream()
+            result.getIntroducedTransforms().stream()
                 .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))
                 .entrySet()
                 .toArray(new Map.Entry[0])));
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
index 055b66594386..c2399545ce57 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
@@ -210,10 +210,7 @@ public void transformWithSideAndMainInputs() {
     PTransform parDoTransform = components.getTransformsOrThrow("par_do");
     String sideInputLocalName =
         getOnlyElement(
-            parDoTransform
-                .getInputsMap()
-                .entrySet()
-                .stream()
+            parDoTransform.getInputsMap().entrySet().stream()
                 .filter(entry -> !entry.getValue().equals(mainInputName))
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toSet()));
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 790274833666..eafc9b82bfc0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -116,9 +116,7 @@ public LateDataFilter(
           StreamSupport.stream(elements.spliterator(), false)
               .map(
                   input ->
-                      input
-                          .getWindows()
-                          .stream()
+                      input.getWindows().stream()
                           .map(
                               window ->
                                   WindowedValue.of(
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 45c847edae57..93b9a7fddab3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -104,14 +104,16 @@ public String getUrn() {
   /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */
   public static class OverrideFactory<InputT, OutputT, RestrictionT>
       implements PTransformOverrideFactory<
-          PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+          PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+          PCollectionTuple,
           ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
     @Override
     public PTransformReplacement<
             PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+                    PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+                    PCollectionTuple,
                     ProcessKeyedElements<InputT, OutputT, RestrictionT>>
                 transform) {
       return PTransformReplacement.of(
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index d5d10de83707..f03f71f1c619 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -540,8 +540,7 @@ public final void injectElements(List<TimestampedValue<InputT>> values) throws E
     }
 
     Iterable<WindowedValue<InputT>> inputs =
-        values
-            .stream()
+        values.stream()
             .map(
                 input -> {
                   try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 0a9ec4935190..2bab79b1cb3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -28,14 +28,16 @@
 /** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
 class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
     extends SingleInputOutputOverrideFactory<
-        PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
+        PCollection<KV<KeyT, InputT>>,
+        PCollection<KeyedWorkItem<KeyT, InputT>>,
         GBKIntoKeyedWorkItems<KeyT, InputT>> {
   @Override
   public PTransformReplacement<
           PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
+                  PCollection<KV<KeyT, InputT>>,
+                  PCollection<KeyedWorkItem<KeyT, InputT>>,
                   GBKIntoKeyedWorkItems<KeyT, InputT>>
               transform) {
     return PTransformReplacement.of(
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 49528160fa81..873eb1f4c884 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -30,13 +30,15 @@
 /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
 final class DirectGroupByKeyOverrideFactory<K, V>
     extends SingleInputOutputOverrideFactory<
-        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+        PCollection<KV<K, V>>,
+        PCollection<KV<K, Iterable<V>>>,
         PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
   @Override
   public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+                  PCollection<KV<K, V>>,
+                  PCollection<KV<K, Iterable<V>>>,
                   PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
               transform) {
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index a5a1c7658e1e..5b026cba3943 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -324,8 +324,7 @@ private void shutdownIfNecessary(State newState) {
               "Error"
                   + (errors.size() == 1 ? "" : "s")
                   + " during executor shutdown:\n"
-                  + errors
-                      .stream()
+                  + errors.stream()
                       .map(Exception::getMessage)
                       .collect(Collectors.joining("\n- ", "- ", "")));
       visibleUpdates.failed(exception);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 0a2d42f67c05..77aa9fc1826a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -81,7 +81,8 @@ public void cleanup() {}
 
   private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
       AppliedPTransform<
-              PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+              PCollection<KeyedWorkItem<K, V>>,
+              PCollection<KV<K, Iterable<V>>>,
               DirectGroupAlsoByWindow<K, V>>
           application,
       CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
@@ -100,7 +101,8 @@ public void cleanup() {}
     private final EvaluationContext evaluationContext;
     private final PipelineOptions options;
     private final AppliedPTransform<
-            PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+            PCollection<KeyedWorkItem<K, V>>,
+            PCollection<KV<K, Iterable<V>>>,
             DirectGroupAlsoByWindow<K, V>>
         application;
 
@@ -120,7 +122,8 @@ public GroupAlsoByWindowEvaluator(
         PipelineOptions options,
         CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
         final AppliedPTransform<
-                PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+                PCollection<KeyedWorkItem<K, V>>,
+                PCollection<KV<K, Iterable<V>>>,
                 DirectGroupAlsoByWindow<K, V>>
             application) {
       this.evaluationContext = evaluationContext;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index d22ddb7fdebd..4f0594a4291a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -118,7 +118,8 @@ public boolean matches(AppliedPTransform<?, ?, ?> application) {
 
   static class Factory<K, InputT, AccumT, OutputT>
       extends SingleInputOutputOverrideFactory<
-          PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+          PCollection<KV<K, InputT>>,
+          PCollection<KV<K, OutputT>>,
           PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
     public static PTransformOverrideFactory create() {
       return new Factory<>();
@@ -130,7 +131,8 @@ private Factory() {}
     public PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+                    PCollection<KV<K, InputT>>,
+                    PCollection<KV<K, OutputT>>,
                     PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>
                 transform) {
       GlobalCombineFn<?, ?, ?> globalFn = ((Combine.PerKey) transform.getTransform()).getFn();
@@ -366,7 +368,8 @@ public MergeAndExtractAccumulatorOutputEvaluatorFactory(EvaluationContext ctxt)
 
     private <K, AccumT, OutputT> TransformEvaluator<KV<K, Iterable<AccumT>>> createEvaluator(
         AppliedPTransform<
-                PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>,
+                PCollection<KV<K, Iterable<AccumT>>>,
+                PCollection<KV<K, OutputT>>,
                 MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>>
             application,
         CommittedBundle<KV<K, Iterable<AccumT>>> inputBundle) {
@@ -380,7 +383,8 @@ public void cleanup() throws Exception {}
   private static class MergeAccumulatorsAndExtractOutputEvaluator<K, AccumT, OutputT>
       implements TransformEvaluator<KV<K, Iterable<AccumT>>> {
     private final AppliedPTransform<
-            PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>,
+            PCollection<KV<K, Iterable<AccumT>>>,
+            PCollection<KV<K, OutputT>>,
             MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>>
         application;
     private final CombineFn<?, AccumT, OutputT> combineFn;
@@ -389,7 +393,8 @@ public void cleanup() throws Exception {}
     public MergeAccumulatorsAndExtractOutputEvaluator(
         EvaluationContext ctxt,
         AppliedPTransform<
-                PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>,
+                PCollection<KV<K, Iterable<AccumT>>>,
+                PCollection<KV<K, OutputT>>,
                 MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>>
             application) {
       this.application = application;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 01bb26dfaa4c..c8ad2383659e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -112,9 +112,7 @@
         evaluationContext.createSideInputReader(sideInputs);
 
     Map<TupleTag<?>, Coder<?>> outputCoders =
-        outputs
-            .entrySet()
-            .stream()
+        outputs.entrySet().stream()
             .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getCoder()));
 
     PushbackSideInputDoFnRunner<InputT, OutputT> runner =
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 112487851e8b..a11b384d7215 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -65,13 +65,15 @@
 @VisibleForTesting
 public class ParDoMultiOverrideFactory<InputT, OutputT>
     implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollectionTuple,
+        PCollection<? extends InputT>,
+        PCollectionTuple,
         PTransform<PCollection<? extends InputT>, PCollectionTuple>> {
   @Override
   public PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<? extends InputT>, PCollectionTuple,
+                  PCollection<? extends InputT>,
+                  PCollectionTuple,
                   PTransform<PCollection<? extends InputT>, PCollectionTuple>>
               application) {
 
@@ -87,7 +89,8 @@
   @SuppressWarnings("unchecked")
   private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication(
       AppliedPTransform<
-              PCollection<? extends InputT>, PCollectionTuple,
+              PCollection<? extends InputT>,
+              PCollectionTuple,
               PTransform<PCollection<? extends InputT>, PCollectionTuple>>
           application)
       throws IOException {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 737098f44738..86820ef2457b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -103,7 +103,8 @@ public void cleanup() throws Exception {
   @SuppressWarnings({"unchecked", "rawtypes"})
   private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> createEvaluator(
       AppliedPTransform<
-              PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+              PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
+              PCollectionTuple,
               ProcessElements<InputT, OutputT, RestrictionT, PositionT>>
           application,
       CommittedBundle<InputT> inputBundle)
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index a12d71b865f6..bd05a5bcaca9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -106,7 +106,8 @@ public void cleanup() throws Exception {
   @SuppressWarnings({"unchecked", "rawtypes"})
   private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(
       AppliedPTransform<
-              PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple,
+              PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>,
+              PCollectionTuple,
               StatefulParDo<K, InputT, OutputT>>
           application,
       CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> inputBundle)
@@ -203,7 +204,8 @@ public Runnable load(
   @AutoValue
   abstract static class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
     abstract AppliedPTransform<
-            PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple,
+            PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>,
+            PCollectionTuple,
             StatefulParDo<K, InputT, OutputT>>
         getTransform();
 
@@ -213,7 +215,8 @@ public Runnable load(
 
     static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(
         AppliedPTransform<
-                PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple,
+                PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>,
+                PCollectionTuple,
                 StatefulParDo<K, InputT, OutputT>>
             transform,
         StructuralKey<K> key,
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 6f5eff614a7d..397b0675f640 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -42,13 +42,15 @@
  */
 class ViewOverrideFactory<ElemT, ViewT>
     implements PTransformOverrideFactory<
-        PCollection<ElemT>, PCollection<ElemT>,
+        PCollection<ElemT>,
+        PCollection<ElemT>,
         PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
 
   @Override
   public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>,
+              PCollection<ElemT>,
+              PCollection<ElemT>,
               PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           transform) {
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 4d5a2d9f2123..62cc673f1953 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -51,7 +51,8 @@
  */
 class WriteWithShardingFactory<InputT, DestinationT>
     implements PTransformOverrideFactory<
-        PCollection<InputT>, WriteFilesResult<DestinationT>,
+        PCollection<InputT>,
+        WriteFilesResult<DestinationT>,
         PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
   @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
@@ -60,7 +61,8 @@
   public PTransformReplacement<PCollection<InputT>, WriteFilesResult<DestinationT>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<InputT>, WriteFilesResult<DestinationT>,
+                  PCollection<InputT>,
+                  WriteFilesResult<DestinationT>,
                   PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>>
               transform) {
     try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index 55282534e40c..e4442108f123 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -317,8 +317,7 @@ private void shutdownIfNecessary(State newState) {
               "Error"
                   + (errors.size() == 1 ? "" : "s")
                   + " during executor shutdown:\n"
-                  + errors
-                      .stream()
+                  + errors.stream()
                       .map(Exception::getMessage)
                       .collect(Collectors.joining("\n- ", "- ", "")));
       visibleUpdates.failed(exception);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 8c6873a422e7..2c2c9f07425c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -479,8 +479,7 @@ private static Pipeline foldFeedSDFIntoExecutableStage(Pipeline p) {
     QueryablePipeline q = QueryablePipeline.forPipeline(p);
     String feedSdfUrn = SplittableRemoteStageEvaluatorFactory.FEED_SDF_URN;
     List<PTransformNode> feedSDFNodes =
-        q.getTransforms()
-            .stream()
+        q.getTransforms().stream()
             .filter(node -> node.getTransform().getSpec().getUrn().equals(feedSdfUrn))
             .collect(Collectors.toList());
     Map<String, PTransformNode> stageToFeeder = Maps.newHashMap();
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
index 77b1028844c2..8e1514f2fd62 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
@@ -124,10 +124,9 @@ public void stop() {
 
   private static class ServerConfiguration {
     @Option(
-      name = "-p",
-      aliases = {"--port"},
-      usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)"
-    )
+        name = "-p",
+        aliases = {"--port"},
+        usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)")
     private int port = 8099;
   }
 }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index eac8554804b7..d3ede31814e9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -326,7 +326,8 @@ public void onElement(final ProcessContext ctx) {
 
                   @Teardown
                   public void teardown() {
-                    // just to not have a fast execution hiding an issue until we have a shutdown callback
+                    // just to not have a fast execution hiding an issue until we have a shutdown
+                    // callback
                     try {
                       Thread.sleep(1000);
                     } catch (final InterruptedException e) {
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index a9673bf716ca..54018cfdbf46 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -148,7 +148,8 @@ public void process(ProcessContext c) {}
         new StatefulParDoEvaluatorFactory<>(mockEvaluationContext, options);
 
     AppliedPTransform<
-            PCollection<? extends KeyedWorkItem<String, KV<String, Integer>>>, PCollectionTuple,
+            PCollection<? extends KeyedWorkItem<String, KV<String, Integer>>>,
+            PCollectionTuple,
             StatefulParDo<String, Integer, Integer>>
         producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced);
 
@@ -256,7 +257,8 @@ public void process(ProcessContext c) {}
 
     // This will be the stateful ParDo from the expansion
     AppliedPTransform<
-            PCollection<KeyedWorkItem<String, KV<String, Integer>>>, PCollectionTuple,
+            PCollection<KeyedWorkItem<String, KV<String, Integer>>>,
+            PCollectionTuple,
             StatefulParDo<String, Integer, Integer>>
         producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced);
 
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 5ec2fd9c28f1..684ac0223ba9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -151,7 +151,8 @@ public void withNoShardingSpecifiedReturnsNewTransform() {
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 
     AppliedPTransform<
-            PCollection<Object>, WriteFilesResult<Void>,
+            PCollection<Object>,
+            WriteFilesResult<Void>,
             PTransform<PCollection<Object>, WriteFilesResult<Void>>>
         originalApplication =
             AppliedPTransform.of("write", objs.expand(), Collections.emptyMap(), original, p);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index f16aaea6f0ac..fd55e85d4c43 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -157,9 +157,7 @@ public void proc(ProcessContext ctxt) {
     PTransformNode impulseTransform = getOnlyElement(fusedQP.getRootTransforms());
     PCollectionNode impulseOutput = getOnlyElement(fusedQP.getOutputPCollections(impulseTransform));
     PTransformNode stage =
-        fusedPipeline
-            .getRootTransformIdsList()
-            .stream()
+        fusedPipeline.getRootTransformIdsList().stream()
             .map(
                 id ->
                     PipelineNode.pTransform(
@@ -212,9 +210,7 @@ public void process(ProcessContext ctxt) {
     checkState(leftRoot != null);
     checkState(rightRoot != null);
     PTransformNode stage =
-        fusedPipeline
-            .getRootTransformIdsList()
-            .stream()
+        fusedPipeline.getRootTransformIdsList().stream()
             .map(
                 id ->
                     PipelineNode.pTransform(
diff --git a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
index 2d1d94b16d76..c3af72c3ec80 100644
--- a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
+++ b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
@@ -128,9 +128,8 @@ private CounterMetricMessage(
     }
 
     @SuppressFBWarnings(
-      value = "VA_FORMAT_STRING_USES_NEWLINE",
-      justification = "\\n is part of graphite protocol"
-    )
+        value = "VA_FORMAT_STRING_USES_NEWLINE",
+        justification = "\\n is part of graphite protocol")
     @Override
     protected String createCommittedMessage() {
       String metricMessage =
@@ -145,9 +144,8 @@ protected String createCommittedMessage() {
     }
 
     @SuppressFBWarnings(
-      value = "VA_FORMAT_STRING_USES_NEWLINE",
-      justification = "\\n is part of graphite protocol"
-    )
+        value = "VA_FORMAT_STRING_USES_NEWLINE",
+        justification = "\\n is part of graphite protocol")
     @Override
     protected String createAttemptedMessage() {
       String metricMessage =
@@ -172,9 +170,8 @@ private GaugeMetricMessage(MetricResult<GaugeResult> gauge, String valueType) {
     }
 
     @SuppressFBWarnings(
-      value = "VA_FORMAT_STRING_USES_NEWLINE",
-      justification = "\\n is part of graphite protocol"
-    )
+        value = "VA_FORMAT_STRING_USES_NEWLINE",
+        justification = "\\n is part of graphite protocol")
     @Override
     protected String createCommittedMessage() {
       String metricMessage =
@@ -188,9 +185,8 @@ protected String createCommittedMessage() {
     }
 
     @SuppressFBWarnings(
-      value = "VA_FORMAT_STRING_USES_NEWLINE",
-      justification = "\\n is part of graphite protocol"
-    )
+        value = "VA_FORMAT_STRING_USES_NEWLINE",
+        justification = "\\n is part of graphite protocol")
     @Override
     protected String createAttemptedMessage() {
       String metricMessage =
@@ -218,9 +214,8 @@ public DistributionMetricMessage(
     }
 
     @SuppressFBWarnings(
-      value = "VA_FORMAT_STRING_USES_NEWLINE",
-      justification = "\\n is part of graphite protocol"
-    )
+        value = "VA_FORMAT_STRING_USES_NEWLINE",
+        justification = "\\n is part of graphite protocol")
     @Override
     protected String createCommittedMessage() {
       Number value = null;
@@ -255,9 +250,8 @@ protected String createCommittedMessage() {
     }
 
     @SuppressFBWarnings(
-      value = "VA_FORMAT_STRING_USES_NEWLINE",
-      justification = "\\n is part of graphite protocol"
-    )
+        value = "VA_FORMAT_STRING_USES_NEWLINE",
+        justification = "\\n is part of graphite protocol")
     @Override
     protected String createAttemptedMessage() {
       Number value = null;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 96f4b5b15f94..df067077d8bd 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -135,14 +135,16 @@ private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
 
   public static class Factory<ElemT, ViewT>
       implements PTransformOverrideFactory<
-          PCollection<ElemT>, PCollection<ElemT>,
+          PCollection<ElemT>,
+          PCollection<ElemT>,
           PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
     public Factory() {}
 
     @Override
     public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
         AppliedPTransform<
-                PCollection<ElemT>, PCollection<ElemT>,
+                PCollection<ElemT>,
+                PCollection<ElemT>,
                 PTransform<PCollection<ElemT>, PCollection<ElemT>>>
             transform) {
       PCollection<ElemT> collection =
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index d2557e8b239e..8f9fcb9ade4d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -296,7 +296,7 @@ public boolean test(RunnerApi.PTransform pTransform) {
     } catch (InvalidProtocolBufferException e) {
       throw new IllegalArgumentException(e);
     }
-    //TODO: https://issues.apache.org/jira/browse/BEAM-4296
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4296
     // This only works for well known window fns, we should defer this execution to the SDK
     // if the WindowFn can't be parsed or just defer it all the time.
     WindowFn<T, ? extends BoundedWindow> windowFn =
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index a688bee5f3cc..271ef0bcc4d3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -707,11 +707,13 @@ public void translateNode(
 
       @SuppressWarnings("unchecked")
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>,
+              PCollection<ElemT>,
+              PCollection<ElemT>,
               PTransform<PCollection<ElemT>, PCollection<ElemT>>>
           application =
               (AppliedPTransform<
-                      PCollection<ElemT>, PCollection<ElemT>,
+                      PCollection<ElemT>,
+                      PCollection<ElemT>,
                       PTransform<PCollection<ElemT>, PCollection<ElemT>>>)
                   context.getCurrentTransform();
       PCollectionView<ViewT> input;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 7222a3037a38..466069f8ea31 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -110,10 +110,7 @@ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
   }
 
   public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
-    return currentTransform
-        .getOutputs()
-        .entrySet()
-        .stream()
+    return currentTransform.getOutputs().entrySet().stream()
         .filter(e -> e.getValue() instanceof PCollection)
         .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
   }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 37dfb0c0ecb0..3c69817fa5ee 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -99,7 +99,8 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
     if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
       flinkBatchEnv.setParallelism(options.getParallelism());
     }
-    // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent splits.
+    // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent
+    // splits.
     final int parallelism;
     if (flinkBatchEnv instanceof CollectionEnvironment) {
       parallelism = 1;
@@ -267,7 +268,8 @@ private static int determineParallelism(
       return pipelineOptionsParallelism;
     }
     if (envParallelism > 0) {
-      // If the user supplies a parallelism on the command-line, this is set on the execution environment during creation
+      // If the user supplies a parallelism on the command-line, this is set on the execution
+      // environment during creation
       return envParallelism;
     }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index c00f14c8f234..efaa40aa4ea2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -282,8 +282,7 @@ private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
     Collection<RunnerApi.PCollection> pCollecctions =
         pipeline.getComponents().getPcollectionsMap().values();
     // Assume that all PCollections are consumed at some point in the pipeline.
-    return pCollecctions
-        .stream()
+    return pCollecctions.stream()
         .anyMatch(pc -> pc.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED);
   }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 40e815640581..e40b91f80539 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -58,15 +58,13 @@
     String host = "localhost";
 
     @Option(
-      name = "--job-port",
-      usage = "The job service port. 0 to use a dynamic port. (Default: 8099)"
-    )
+        name = "--job-port",
+        usage = "The job service port. 0 to use a dynamic port. (Default: 8099)")
     int port = 8099;
 
     @Option(
-      name = "--artifact-port",
-      usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)"
-    )
+        name = "--artifact-port",
+        usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)")
     int artifactPort = 8098;
 
     @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
@@ -74,9 +72,8 @@
         Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
 
     @Option(
-      name = "--clean-artifacts-per-job",
-      usage = "When true, remove each job's staged artifacts when it completes"
-    )
+        name = "--clean-artifacts-per-job",
+        usage = "When true, remove each job's staged artifacts when it completes")
     boolean cleanArtifactsPerJob = false;
 
     @Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
@@ -87,9 +84,8 @@ String getFlinkMasterUrl() {
     }
 
     @Option(
-      name = "--sdk-worker-parallelism",
-      usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
-    )
+        name = "--sdk-worker-parallelism",
+        usage = "Default parallelism for SDK worker processes (see portable pipeline options)")
     Long sdkWorkerParallelism = 1L;
 
     Long getSdkWorkerParallelism() {
@@ -97,12 +93,11 @@ Long getSdkWorkerParallelism() {
     }
 
     @Option(
-      name = "--flink-conf-dir",
-      usage =
-          "Directory containing Flink YAML configuration files. "
-              + "These properties will be set to all jobs submitted to Flink and take precedence "
-              + "over configurations in FLINK_CONF_DIR."
-    )
+        name = "--flink-conf-dir",
+        usage =
+            "Directory containing Flink YAML configuration files. "
+                + "These properties will be set to all jobs submitted to Flink and take precedence "
+                + "over configurations in FLINK_CONF_DIR.")
     String flinkConfDir = null;
 
     @Nullable
@@ -112,7 +107,7 @@ String getFlinkConfDir() {
   }
 
   public static void main(String[] args) throws Exception {
-    //TODO: Expose the fileSystem related options.
+    // TODO: Expose the fileSystem related options.
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
     fromParams(args).run();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 6126af5303f5..83776e170864 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -172,7 +172,8 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
   @VisibleForTesting
   static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
       implements PTransformOverrideFactory<
-          PCollection<UserT>, WriteFilesResult<DestinationT>,
+          PCollection<UserT>,
+          WriteFilesResult<DestinationT>,
           WriteFiles<UserT, DestinationT, OutputT>> {
     FlinkPipelineOptions options;
 
@@ -184,7 +185,8 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
     public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<UserT>, WriteFilesResult<DestinationT>,
+                    PCollection<UserT>,
+                    WriteFilesResult<DestinationT>,
                     WriteFiles<UserT, DestinationT, OutputT>>
                 transform) {
       // By default, if numShards is not set WriteFiles will produce one file per bundle. In
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 80ed3601fe26..8cd195b53103 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -489,7 +489,7 @@ private void translateStreamingImpulse(
     } catch (InvalidProtocolBufferException e) {
       throw new IllegalArgumentException(e);
     }
-    //TODO: https://issues.apache.org/jira/browse/BEAM-4296
+    // TODO: https://issues.apache.org/jira/browse/BEAM-4296
     // This only works for well known window fns, we should defer this execution to the SDK
     // if the WindowFn can't be parsed or just defer it all the time.
     WindowFn<T, ? extends BoundedWindow> windowFn =
@@ -695,7 +695,8 @@ private void translateStreamingImpulse(
     for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
         stagePayload.getSideInputsList()) {
 
-      // TODO: local name is unique as long as only one transform with side input can be within a stage
+      // TODO: local name is unique as long as only one transform with side input can be within a
+      // stage
       String sideInputTag = sideInputId.getLocalName();
       String collectionId =
           components
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c770cdf790eb..e23e67fa03ac 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -992,7 +992,8 @@ public void translateNode(
         // allowed to have only one input keyed, normally.
 
         TwoInputTransformation<
-                WindowedValue<SingletonKeyedWorkItem<K, InputT>>, RawUnionValue,
+                WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+                RawUnionValue,
                 WindowedValue<KV<K, OutputT>>>
             rawFlinkTransform =
                 new TwoInputTransformation<>(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 00d1ea867feb..027b07c36156 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -103,10 +103,7 @@ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
   }
 
   public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
-    return currentTransform
-        .getOutputs()
-        .entrySet()
-        .stream()
+    return currentTransform.getOutputs().entrySet().stream()
         .filter(e -> e.getValue() instanceof PCollection)
         .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
   }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
index 32ec4ca6229b..7eb8b23ef139 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
@@ -59,10 +59,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
   }
 
   private boolean hasUnboundedOutput(AppliedPTransform<?, ?, ?> transform) {
-    return transform
-        .getOutputs()
-        .values()
-        .stream()
+    return transform.getOutputs().values().stream()
         .filter(value -> value instanceof PCollection)
         .map(value -> (PCollection<?>) value)
         .anyMatch(collection -> collection.isBounded() == IsBounded.UNBOUNDED);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index 995936ecb191..ccad6742e7bb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -85,7 +85,8 @@ private JobFactoryState(int maxFactories) {
       Preconditions.checkArgument(maxFactories >= 0, "sdk_worker_parallelism must be >= 0");
 
       if (maxFactories == 0) {
-        // if this is 0, use the auto behavior of num_cores - 1 so that we leave some resources available for the java process
+        // if this is 0, use the auto behavior of num_cores - 1 so that we leave some resources
+        // available for the java process
         this.maxFactories = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
       } else {
         this.maxFactories = maxFactories;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index bc51a106d7bd..e7dafa8968f8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -332,7 +332,7 @@ public void close() throws Exception {
     // close may be called multiple times when an exception is thrown
     if (stageContext != null) {
       try (AutoCloseable bundleFactoryCloser = stageBundleFactory;
-           AutoCloseable closable = stageContext) {
+          AutoCloseable closable = stageContext) {
       } catch (Exception e) {
         LOG.error("Error in close: ", e);
         throw e;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index c211403c2d4e..e404298d9ae9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -86,9 +86,7 @@ public SideInputInitializer(PCollectionView<ViewT> view) {
                   InMemoryMultimapSideInputView.fromIterable(
                       keyCoder,
                       (Iterable)
-                          elements
-                              .getValue()
-                              .stream()
+                          elements.getValue().stream()
                               .map(WindowedValue::getValue)
                               .collect(Collectors.toList()))));
     }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d1ef494fc2e1..eef86d7728e8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -354,7 +354,8 @@ private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerDa
     try {
       Object key = keySelector.getKey(timerElement);
       sdkHarnessRunner.setCurrentTimerKey(key);
-      // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+      // We have to synchronize to ensure the state backend is not concurrently accessed by the
+      // state requests
       try {
         stateBackendLock.lock();
         getKeyedStateBackend().setCurrentKey(key);
@@ -385,7 +386,8 @@ public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
     }
     // Prepare the SdkHarnessRunner with the key for the timer
     sdkHarnessRunner.setCurrentTimerKey(decodedKey);
-    // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+    // We have to synchronize to ensure the state backend is not concurrently accessed by the state
+    // requests
     try {
       stateBackendLock.lock();
       getKeyedStateBackend().setCurrentKey(encodedKey);
@@ -399,9 +401,10 @@ public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
   public void dispose() throws Exception {
     // may be called multiple times when an exception is thrown
     if (stageContext != null) {
-      // Remove the reference to stageContext and make stageContext available for garbage collection.
+      // Remove the reference to stageContext and make stageContext available for garbage
+      // collection.
       try (AutoCloseable bundleFactoryCloser = stageBundleFactory;
-           AutoCloseable closable = stageContext) {
+          AutoCloseable closable = stageContext) {
         // DoFnOperator generates another "bundle" for the final watermark
         // https://issues.apache.org/jira/browse/BEAM-5816
         super.dispose();
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
index a1ccca2fd2e0..ede1b5cb2b57 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
@@ -303,9 +303,7 @@ public void close() {}
 
       sourceThread.start();
 
-      while (flinkWrapper
-          .getLocalReaders()
-          .stream()
+      while (flinkWrapper.getLocalReaders().stream()
           .anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
         // readers haven't been initialized
         Thread.sleep(50);
@@ -631,7 +629,8 @@ private static void testSourceDoesNotShutdown(boolean shouldHaveReaders) throws
       SourceFunction.SourceContext sourceContext = Mockito.mock(SourceFunction.SourceContext.class);
       Object checkpointLock = new Object();
       Mockito.when(sourceContext.getCheckpointLock()).thenReturn(checkpointLock);
-      // Initialize source context early to avoid concurrency issues with its initialization in the run
+      // Initialize source context early to avoid concurrency issues with its initialization in the
+      // run
       // method and the onProcessingTime call on the wrapper.
       sourceWrapper.setSourceContext(sourceContext);
 
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index ffe92fa5d05f..ff0590b112a4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -69,7 +69,8 @@
    */
   public static <K, InputT, OutputT>
       PTransformOverrideFactory<
-              PCollection<KV<K, InputT>>, PCollection<OutputT>,
+              PCollection<KV<K, InputT>>,
+              PCollection<OutputT>,
               ParDo.SingleOutput<KV<K, InputT>, OutputT>>
           singleOutputOverrideFactory(DataflowPipelineOptions options) {
     return new SingleOutputOverrideFactory<>(isFnApi(options));
@@ -81,7 +82,8 @@
    */
   public static <K, InputT, OutputT>
       PTransformOverrideFactory<
-              PCollection<KV<K, InputT>>, PCollectionTuple,
+              PCollection<KV<K, InputT>>,
+              PCollectionTuple,
               ParDo.MultiOutput<KV<K, InputT>, OutputT>>
           multiOutputOverrideFactory(DataflowPipelineOptions options) {
     return new MultiOutputOverrideFactory<>(isFnApi(options));
@@ -94,7 +96,8 @@ private static boolean isFnApi(DataflowPipelineOptions options) {
 
   private static class SingleOutputOverrideFactory<K, InputT, OutputT>
       implements PTransformOverrideFactory<
-          PCollection<KV<K, InputT>>, PCollection<OutputT>,
+          PCollection<KV<K, InputT>>,
+          PCollection<OutputT>,
           ParDo.SingleOutput<KV<K, InputT>, OutputT>> {
 
     private final boolean isFnApi;
@@ -107,7 +110,8 @@ private SingleOutputOverrideFactory(boolean isFnApi) {
     public PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<OutputT>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, InputT>>, PCollection<OutputT>,
+                    PCollection<KV<K, InputT>>,
+                    PCollection<OutputT>,
                     SingleOutput<KV<K, InputT>, OutputT>>
                 transform) {
       return PTransformReplacement.of(
@@ -136,7 +140,8 @@ private MultiOutputOverrideFactory(boolean isFnApi) {
     public PTransformReplacement<PCollection<KV<K, InputT>>, PCollectionTuple>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, InputT>>, PCollectionTuple,
+                    PCollection<KV<K, InputT>>,
+                    PCollectionTuple,
                     MultiOutput<KV<K, InputT>, OutputT>>
                 transform) {
       return PTransformReplacement.of(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 0db5892c656d..e93a15c893e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -882,10 +882,7 @@ public void translate(ParDo.MultiOutput transform, TranslationContext context) {
               ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
             StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
             Map<TupleTag<?>, Coder<?>> outputCoders =
-                context
-                    .getOutputs(transform)
-                    .entrySet()
-                    .stream()
+                context.getOutputs(transform).entrySet().stream()
                     .collect(
                         Collectors.toMap(
                             Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
@@ -920,10 +917,7 @@ public void translate(ParDoSingle transform, TranslationContext context) {
 
             StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
             Map<TupleTag<?>, Coder<?>> outputCoders =
-                context
-                    .getOutputs(transform)
-                    .entrySet()
-                    .stream()
+                context.getOutputs(transform).entrySet().stream()
                     .collect(
                         Collectors.toMap(
                             Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
@@ -991,10 +985,7 @@ public void translate(
             StepTranslationContext stepContext =
                 context.addStep(transform, "SplittableProcessKeyed");
             Map<TupleTag<?>, Coder<?>> outputCoders =
-                context
-                    .getOutputs(transform)
-                    .entrySet()
-                    .stream()
+                context.getOutputs(transform).entrySet().stream()
                     .collect(
                         Collectors.toMap(
                             Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 774614180783..1a7ae3c28fcb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -788,8 +788,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
     if (!isNullOrEmpty(dataflowOptions.getMinCpuPlatform())) {
 
       List<String> minCpuFlags =
-          experiments
-              .stream()
+          experiments.stream()
               .filter(p -> p.startsWith("min_cpu_platform"))
               .collect(Collectors.toList());
 
@@ -1677,13 +1676,15 @@ private String getJobIdFromName(String jobName) {
 
   private static class PrimitiveCombineGroupedValuesOverrideFactory<K, InputT, OutputT>
       implements PTransformOverrideFactory<
-          PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>,
+          PCollection<KV<K, Iterable<InputT>>>,
+          PCollection<KV<K, OutputT>>,
           Combine.GroupedValues<K, InputT, OutputT>> {
     @Override
     public PTransformReplacement<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>,
+                    PCollection<KV<K, Iterable<InputT>>>,
+                    PCollection<KV<K, OutputT>>,
                     GroupedValues<K, InputT, OutputT>>
                 transform) {
       return PTransformReplacement.of(
@@ -1726,7 +1727,8 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
   @VisibleForTesting
   static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
       implements PTransformOverrideFactory<
-          PCollection<UserT>, WriteFilesResult<DestinationT>,
+          PCollection<UserT>,
+          WriteFilesResult<DestinationT>,
           WriteFiles<UserT, DestinationT, OutputT>> {
     // We pick 10 as a a default, as it works well with the default number of workers started
     // by Dataflow.
@@ -1741,7 +1743,8 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
     public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<UserT>, WriteFilesResult<DestinationT>,
+                    PCollection<UserT>,
+                    WriteFilesResult<DestinationT>,
                     WriteFiles<UserT, DestinationT, OutputT>>
                 transform) {
       // By default, if numShards is not set WriteFiles will produce one file per bundle. In
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 29c1699c75ed..d9758c625aee 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -66,7 +66,8 @@
   public PTransformReplacement<PCollection<? extends InputT>, PCollection<OutputT>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<? extends InputT>, PCollection<OutputT>,
+                  PCollection<? extends InputT>,
+                  PCollection<OutputT>,
                   SingleOutput<InputT, OutputT>>
               transform) {
     return PTransformReplacement.of(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
index ef238b26cb2f..40a05f3f9330 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -49,7 +49,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
 
   private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
-    int cores = 4; //TODO: decide at runtime?
+    int cores = 4; // TODO: decide at runtime?
     if (options.getMaxNumWorkers() > 0) {
       return options.getMaxNumWorkers() * cores;
     } else if (options.getNumWorkers() > 0) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
index 4d17b6f74d60..6b62fac6094c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
@@ -75,7 +75,8 @@ private static ApiComponents apiComponentsFromUrl(String urlString) {
             getJsonFactory(),
             chainHttpRequestInitializer(
                 options.getGcpCredential(),
-                // Do not log 404. It clutters the output and is possibly even required by the caller.
+                // Do not log 404. It clutters the output and is possibly even required by the
+                // caller.
                 new RetryHttpRequestInitializer(ImmutableList.of(404))))
         .setApplicationName(options.getAppName())
         .setRootUrl(components.rootUrl)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7e9a1d2f32e8..e52c021884f9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -339,8 +339,7 @@ public Node typedApply(ExecutableStageNode input) {
             Iterables.filter(network.successors(input), OutputReceiverNode.class);
 
         Map<String, OutputReceiver> outputReceiverMap = new HashMap<>();
-        Lists.newArrayList(outputReceiverNodes)
-            .stream()
+        Lists.newArrayList(outputReceiverNodes).stream()
             .forEach(
                 outputReceiverNode ->
                     outputReceiverMap.put(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index 392941f8bf05..aefffee3f301 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -74,6 +74,7 @@
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 /**
  * A {@link WorkExecutor} that processes a list of {@link Operation}s.
  *
@@ -489,8 +490,7 @@ private void updateMetrics(List<MonitoringInfo> monitoringInfos) {
               bundleProcessOperation.getPtransformIdToUserStepContext());
 
       counterUpdates =
-          monitoringInfos
-              .stream()
+          monitoringInfos.stream()
               .map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate)
               .filter(Objects::nonNull)
               .collect(Collectors.toList());
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index dfb2047a5764..73b995bf754f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -236,7 +236,8 @@ public Node apply(MutableNetwork<Node, Edge> input) {
       nodesToPCollections.put(node, pcollectionId);
       componentsBuilder.putPcollections(pcollectionId, pCollection);
 
-      // Check whether this output collection has consumers from worker side when "use_executable_stage_bundle_execution"
+      // Check whether this output collection has consumers from worker side when
+      // "use_executable_stage_bundle_execution"
       // is set
       if (input.successors(node).stream().anyMatch(RemoteGrpcPortNode.class::isInstance)) {
         executableStageOutputs.add(PipelineNode.pCollection(pcollectionId, pCollection));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java
index b2913c92962e..9ad594ec65bf 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java
@@ -109,9 +109,7 @@ public void captureData(PrintWriter writer) {
 
     // Then, print out each stack along with the threads that share it. Stacks with more threads
     // are printed first.
-    stacks
-        .entrySet()
-        .stream()
+    stacks.entrySet().stream()
         .sorted(Comparator.comparingInt(e -> -e.getValue().size()))
         .forEachOrdered(
             entry -> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java
index cf82391b5317..79d92ba8378f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java
@@ -150,9 +150,7 @@ public String apply(String input) {
     MutableNetwork<String, String> originalNetwork = createNetwork();
     for (String node : originalNetwork.nodes()) {
       assertEquals(
-          originalNetwork
-              .successors(node)
-              .stream()
+          originalNetwork.successors(node).stream()
               .map(function)
               .collect(Collectors.toCollection(HashSet::new)),
           network.successors(function.apply(node)));
@@ -161,9 +159,7 @@ public String apply(String input) {
     }
     assertEquals(
         network.nodes(),
-        originalNetwork
-            .nodes()
-            .stream()
+        originalNetwork.nodes().stream()
             .map(function)
             .collect(Collectors.toCollection(HashSet::new)));
   }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 1b2d16187307..967f8fc0f808 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -45,8 +45,7 @@ public Server allocateAddressAndCreate(
     String name = String.format("InProcessServer_%s", serviceNameUniqifier.getAndIncrement());
     builder.setUrl(name);
     InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(name);
-    services
-        .stream()
+    services.stream()
         .forEach(
             service ->
                 serverBuilder.addService(
@@ -59,8 +58,7 @@ public Server allocateAddressAndCreate(
   public Server create(List<BindableService> services, ApiServiceDescriptor serviceDescriptor)
       throws IOException {
     InProcessServerBuilder builder = InProcessServerBuilder.forName(serviceDescriptor.getUrl());
-    services
-        .stream()
+    services.stream()
         .forEach(
             service ->
                 builder.addService(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 38cae631a975..298f054921e4 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -150,8 +150,7 @@ private static Server createServer(List<BindableService> services, InetSocketAdd
               // buffer size in the layers above.
               .maxMessageSize(Integer.MAX_VALUE)
               .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
-      services
-          .stream()
+      services.stream()
           .forEach(
               service ->
                   builder.addService(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
index 1ffe4390e74c..0d8372725928 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
@@ -104,9 +104,7 @@ public void getArtifact(
       ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(request.getRetrievalToken());
       // look for file at URI specified by proxy manifest location
       ArtifactApi.ProxyManifest.Location location =
-          proxyManifest
-              .getLocationList()
-              .stream()
+          proxyManifest.getLocationList().stream()
               .filter(loc -> loc.getName().equals(name))
               .findFirst()
               .orElseThrow(
@@ -117,8 +115,7 @@ public void getArtifact(
 
       List<ArtifactMetadata> existingArtifacts = proxyManifest.getManifest().getArtifactList();
       ArtifactMetadata metadata =
-          existingArtifacts
-              .stream()
+          existingArtifacts.stream()
               .filter(meta -> meta.getName().equals(name))
               .findFirst()
               .orElseThrow(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index e477cd9f7f54..04a883dc4603 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -110,9 +110,7 @@ private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(
     // Create with all of the processing transforms, and all of the components.
     // TODO: Remove the unreachable subcomponents if the size of the descriptor matters.
     Map<String, PTransform> stageTransforms =
-        stage
-            .getTransforms()
-            .stream()
+        stage.getTransforms().stream()
             .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform));
 
     Components.Builder components =
@@ -388,7 +386,8 @@ private static TargetEncoding addStageOutput(
       outputTargetCodersBuilder.put(targetEncoding.getTarget(), targetEncoding.getCoder());
       components.putTransforms(
           timerReference.transform().getId(),
-          // Since a transform can have more then one timer, update the transform inside components and not the original
+          // Since a transform can have more then one timer, update the transform inside components
+          // and not the original
           components
               .getTransformsOrThrow(timerReference.transform().getId())
               .toBuilder()
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
index fb858dc784c6..94834fcbb471 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -128,7 +128,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
             .addAll(gcsCredentialArgs())
             // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
             .add("--network=host")
-            // We need to pass on the information about Docker-on-Mac environment (due to missing host networking on Mac)
+            // We need to pass on the information about Docker-on-Mac environment (due to missing
+            // host networking on Mac)
             .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
 
     if (!retainDockerContainer) {
@@ -206,7 +207,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
    * likely only support the latest version at any time.
    */
   private static class DockerOnMac {
-    // TODO: This host name seems to change with every other Docker release. Do we attempt to keep up
+    // TODO: This host name seems to change with every other Docker release. Do we attempt to keep
+    // up
     // or attempt to document the supported Docker version(s)?
     private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
 
@@ -280,7 +282,8 @@ private static Platform getPlatform() {
       String osName = System.getProperty("os.name").toLowerCase();
       // TODO: Make this more robust?
       // The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on
-      // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable from Linux.
+      // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable
+      // from Linux.
       // We still need to apply port mapping due to missing host networking.
       if (osName.startsWith("mac") || DockerOnMac.RUNNING_INSIDE_DOCKER_ON_MAC) {
         return Platform.MAC;
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
index 259a69394d8f..05dd45b1f451 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -445,9 +445,7 @@ private void assertFiles(Set<String> files, String retrievalToken) throws Except
     Assert.assertEquals(
         "Files in locations does not match actual file list.",
         files,
-        proxyManifest
-            .getLocationList()
-            .stream()
+        proxyManifest.getLocationList().stream()
             .map(Location::getName)
             .collect(Collectors.toSet()));
     Assert.assertEquals(
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 958d242a7567..75d2bb675c9f 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -103,9 +103,7 @@ public void closeShutsDownEnvironments() throws Exception {
     p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     ExecutableStage stage =
-        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
-            .getFusedStages()
-            .stream()
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream()
             .findFirst()
             .get();
     RemoteEnvironment remoteEnv = mock(RemoteEnvironment.class);
@@ -124,9 +122,7 @@ public void closeShutsDownEnvironmentsWhenSomeFail() throws Exception {
     p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     ExecutableStage firstEnvStage =
-        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
-            .getFusedStages()
-            .stream()
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream()
             .findFirst()
             .get();
     ExecutableStagePayload basePayload =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index 7c5e9f9e7a0b..78a88fbe01b0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -105,8 +105,7 @@ public Admin(BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) {
 
     @Override
     public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
-      return streamNames
-          .stream()
+      return streamNames.stream()
           .collect(
               Collectors.toMap(
                   Function.<String>identity(),
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index 8defed92a7b1..88affe4c0d4d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -118,8 +118,7 @@ public Admin(UnboundedSource<T, CheckpointMarkT> source, SamzaPipelineOptions pi
 
     @Override
     public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
-      return streamNames
-          .stream()
+      return streamNames.stream()
           .collect(
               Collectors.toMap(
                   Function.<String>identity(),
@@ -313,7 +312,7 @@ public void run() {
             updateWatermark();
 
             if (!elementAvailable) {
-              //TODO: make poll interval configurable
+              // TODO: make poll interval configurable
               Thread.sleep(50);
             }
           }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java
index 25ebe609743f..0b00e89dac91 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java
@@ -69,7 +69,7 @@ public void updateMetrics() {
     final GaugeUpdater updateGauge = new GaugeUpdater();
     results.getGauges().forEach(updateGauge);
 
-    //TODO: add distribution metrics to Samza
+    // TODO: add distribution metrics to Samza
   }
 
   private class CounterUpdater implements Consumer<MetricResult<Long>> {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 1d2907447d8a..401cd117b949 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -82,9 +82,8 @@
   // This is derivable from pushbackValues which is persisted to a store.
   // TODO: eagerly initialize the hold in init
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-    justification = "No bug",
-    value = "SE_TRANSIENT_FIELD_NOT_RESTORED"
-  )
+      justification = "No bug",
+      value = "SE_TRANSIENT_FIELD_NOT_RESTORED")
   private transient Instant pushbackWatermarkHold;
 
   // TODO: add this to checkpointable state
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 67c343cbb78f..94e319cd916a 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -536,7 +536,7 @@ public void closeIterators() {
   private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
       implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
 
-    private static final int MAX_KEY_SIZE = 100000; //100K bytes
+    private static final int MAX_KEY_SIZE = 100000; // 100K bytes
     private final Coder<KeyT> keyCoder;
     private final byte[] maxKey;
     private final int storeKeySize;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java
index b9692a221a49..48fb96917cb3 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java
@@ -39,8 +39,7 @@ public void processElement(WindowedValue<T> inputElement, OpEmitter<T> emitter)
       throw new RuntimeException(e);
     }
 
-    windows
-        .stream()
+    windows.stream()
         .map(
             window ->
                 WindowedValue.of(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index c144428a13ef..7e7457dfc604 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -71,10 +71,7 @@ public void translate(
       TranslationContext ctx) {
     final PCollection<? extends InT> input = ctx.getInput(transform);
     final Map<TupleTag<?>, Coder<?>> outputCoders =
-        ctx.getCurrentTransform()
-            .getOutputs()
-            .entrySet()
-            .stream()
+        ctx.getCurrentTransform().getOutputs().entrySet().stream()
             .filter(e -> e.getValue() instanceof PCollection)
             .collect(
                 Collectors.toMap(e -> e.getKey(), e -> ((PCollection<?>) e.getValue()).getCoder()));
@@ -93,9 +90,7 @@ public void translate(
 
     final MessageStream<OpMessage<InT>> inputStream = ctx.getMessageStream(input);
     final List<MessageStream<OpMessage<InT>>> sideInputStreams =
-        transform
-            .getSideInputs()
-            .stream()
+        transform.getSideInputs().stream()
             .map(ctx::<InT>getViewStream)
             .collect(Collectors.toList());
     final Map<TupleTag<?>, Integer> tagToIdMap = new HashMap<>();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
index 82e772ab5441..928d2887eec2 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
@@ -52,8 +52,7 @@ private TestBoundedSource(List<List<Event<T>>> events) {
   @Override
   public List<? extends BoundedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    return events
-        .stream()
+    return events.stream()
         .map(ev -> new TestBoundedSource<>(Collections.singletonList(ev)))
         .collect(Collectors.toList());
   }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
index 0d0710e044d3..d526d859dea4 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
@@ -31,7 +31,7 @@ private TestCheckpointMark(int checkpoint) {
 
   @Override
   public void finalizeCheckpoint() throws IOException {
-    //DO NOTHING
+    // DO NOTHING
   }
 
   static TestCheckpointMark of(int checkpoint) {
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java
index a41adbc4e891..59845e0ffacb 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java
@@ -62,8 +62,7 @@ private TestUnboundedSource(List<List<Event<T>>> events) {
   @Override
   public List<? extends UnboundedSource<T, TestCheckpointMark>> split(
       int desiredNumSplits, PipelineOptions options) throws Exception {
-    return events
-        .stream()
+    return events.stream()
         .map(ev -> new TestUnboundedSource<>(Collections.singletonList(ev)))
         .collect(Collectors.toList());
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 49a4c1aa6fe2..3ed6480a8eef 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -77,8 +77,7 @@ private boolean knownComposite(Class<PTransform<?, ?>> transform) {
 
   private boolean shouldDebug(final TransformHierarchy.Node node) {
     return node == null
-        || (!transforms
-                .stream()
+        || (!transforms.stream()
                 .anyMatch(
                     debugTransform ->
                         debugTransform.getNode().equals(node) && debugTransform.isComposite())
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
index 5e73f52a692d..8acf897884e2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
@@ -98,8 +98,7 @@ private CoderHelpers() {}
    */
   public static <T> Iterable<T> fromByteArrays(
       Collection<byte[]> serialized, final Coder<T> coder) {
-    return serialized
-        .stream()
+    return serialized.stream()
         .map(bytes -> fromByteArray(checkNotNull(bytes, "Cannot decode null values."), coder))
         .collect(Collectors.toList());
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 9d207550684f..ce460a3d848a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -85,7 +85,7 @@
  *
  * @param <T> The type of the element in this stream.
  */
-//TODO: write a proper Builder enforcing all those rules mentioned.
+// TODO: write a proper Builder enforcing all those rules mentioned.
 public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
   public static final String TRANSFORM_URN = "beam:transform:spark:createstream:v1";
 
@@ -96,7 +96,7 @@
   private Instant initialSystemTime;
   private final boolean forceWatermarkSync;
 
-  private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes.
+  private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; // for test purposes.
 
   private CreateStream(
       Duration batchDuration,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index b327a9b25fc3..abc8244e10bf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -167,7 +167,7 @@ int getNumPartitions() {
     return numPartitions;
   }
 
-  //---- Bound by time.
+  // ---- Bound by time.
 
   // return the largest between the proportional read time (%batchDuration dedicated for read)
   // and the min. read time set.
@@ -184,7 +184,7 @@ private Duration boundReadDuration(double readTimePercentage, long minReadTimeMi
     return readDuration;
   }
 
-  //---- Bound by records.
+  // ---- Bound by records.
 
   private scala.Option<Long> rateControlledMaxRecords() {
     final scala.Option<RateController> rateControllerOption = rateController();
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index d88e2b8f044e..e61a088a59c3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -99,7 +99,7 @@
 
   /** State and Timers wrapper. */
   public static class StateAndTimers implements Serializable {
-    //Serializable state for internals (namespace to state tag to coded value).
+    // Serializable state for internals (namespace to state tag to coded value).
     private final Table<String, String, byte[]> state;
     private final Collection<byte[]> serTimers;
 
@@ -151,7 +151,8 @@ public void outputWindowedValue(
       extends AbstractFunction1<
           Iterator<
               Tuple3<
-                  /*K*/ ByteArray, Seq</*Itr<WV<I>>*/ byte[]>,
+                  /*K*/ ByteArray,
+                  Seq</*Itr<WV<I>>*/ byte[]>,
                   Option<Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>,
           Iterator<
               Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>
@@ -412,10 +413,11 @@ public void outputWindowedValue(
         apply(
             final Iterator<
                     Tuple3<
-                        /*K*/ ByteArray, Seq</*Itr<WV<I>>*/ byte[]>,
+                        /*K*/ ByteArray,
+                        Seq</*Itr<WV<I>>*/ byte[]>,
                         Option<Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>
                 input) {
-      //--- ACTUAL STATEFUL OPERATION:
+      // --- ACTUAL STATEFUL OPERATION:
       //
       // Input Iterator: the partition (~bundle) of a co-grouping of the input
       // and the previous state (if exists).
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 8cb0c0af0a7b..41dbbfa3ce86 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -49,7 +49,7 @@
 class SparkStateInternals<K> implements StateInternals {
 
   private final K key;
-  //Serializable state for internals (namespace to state tag to coded value).
+  // Serializable state for internals (namespace to state tag to coded value).
   private final Table<String, String, byte[]> stateTable;
 
   private SparkStateInternals(K key) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 2d21ba566a28..bafe6bf69765 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -93,12 +93,16 @@
    */
   public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
       scala.Function3<
-              Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>,
+              Source<T>,
+              Option<CheckpointMarkT>,
+              State<Tuple2<byte[], Instant>>,
               Tuple2<Iterable<byte[]>, Metadata>>
           mapSourceFunction(final SerializablePipelineOptions options, final String stepName) {
 
     return new SerializableFunction3<
-        Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>,
+        Source<T>,
+        Option<CheckpointMarkT>,
+        State<Tuple2<byte[], Instant>>,
         Tuple2<Iterable<byte[]>, Metadata>>() {
 
       @Override
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 33e12e1da37e..c860906e2a8e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -83,8 +83,7 @@
       JavaRDDLike<byte[], ?> bytesRDD = rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
       List<byte[]> clientBytes = bytesRDD.collect();
       windowedValues =
-          clientBytes
-              .stream()
+          clientBytes.stream()
               .map(bytes -> CoderHelpers.fromByteArray(bytes, windowedValueCoder))
               .collect(Collectors.toList());
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index a1151c662d5b..88f706f256c2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -131,10 +131,7 @@ public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
   }
 
   public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
-    return currentTransform
-        .getOutputs()
-        .entrySet()
-        .stream()
+    return currentTransform.getOutputs().entrySet().stream()
         .filter(e -> e.getValue() instanceof PCollection)
         .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 67672a2ed95e..8336350cb43f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -113,7 +113,7 @@ public PipelineOptions getPipelineOptions() {
     @Override
     public <T> T sideInput(PCollectionView<T> view) {
       checkNotNull(input, "Input in SparkCombineContext must not be null!");
-      //validate element window.
+      // validate element window.
       final Collection<? extends BoundedWindow> elementWindows = input.getWindows();
       checkState(
           elementWindows.size() == 1,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index b9227f40206c..d5326d076021 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -71,7 +71,7 @@ public SparkGlobalCombineFn(
     TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
     WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
 
-    //--- inputs iterator, by window order.
+    // --- inputs iterator, by window order.
     final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
     WindowedValue<InputT> currentInput = iterator.next();
     BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
@@ -170,7 +170,7 @@ public SparkGlobalCombineFn(
     @SuppressWarnings("unchecked")
     TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
 
-    //--- accumulators iterator, by window order.
+    // --- accumulators iterator, by window order.
     final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
 
     // get the first accumulator and assign it to the current window's accumulators.
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 83189a750532..d66467091652 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -74,7 +74,7 @@ public SparkGroupAlsoByWindowViaOutputBufferFn(
     K key = windowedValue.getValue().getKey();
     Iterable<WindowedValue<InputT>> values = windowedValue.getValue().getValue();
 
-    //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
+    // ------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
 
     // Used with Batch, we know that all the data is available for this key. We can't use the
     // timer manager from the context because it doesn't exist. So we create one and emulate the
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 81541276512b..879786ddb9e3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -72,7 +72,7 @@ public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
     TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
     WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
 
-    //--- inputs iterator, by window order.
+    // --- inputs iterator, by window order.
     final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
     WindowedValue<KV<K, InputT>> currentInput = iterator.next();
     BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
@@ -181,7 +181,7 @@ public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
     @SuppressWarnings("unchecked")
     TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
 
-    //--- accumulators iterator, by window order.
+    // --- accumulators iterator, by window order.
     final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
 
     // get the first accumulator and assign it to the current window's accumulators.
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
index b64587ded403..163ba55e6d1b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
@@ -68,7 +68,7 @@ SideInputBroadcast getPCollectionView(PCollectionView<?> view, JavaSparkContext
       }
     }
 
-    //lazily broadcast views
+    // lazily broadcast views
     SideInputBroadcast helper = broadcastHelperMap.get(view);
     if (helper == null) {
       synchronized (SparkPCollectionView.class) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index fc5c5a6baf3f..e654aebf390b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -80,7 +80,7 @@ public JavaStreamingContext call() throws Exception {
 
     // We must first init accumulators since translators expect them to be instantiated.
     SparkRunner.initAccumulators(options, jsc);
-    //do not need to create a MetricsPusher instance here because if is called in SparkRunner.run()
+    // do not need to create a MetricsPusher instance here because if is called in SparkRunner.run()
 
     EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
     // update cache candidates
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index debab3427daf..f9253dcfbed7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -49,16 +49,16 @@ public SparkSideInputReader(
   @Nullable
   @Override
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    //--- validate sideInput.
+    // --- validate sideInput.
     checkNotNull(view, "The PCollectionView passed to sideInput cannot be null ");
     KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>> windowedBroadcastHelper =
         sideInputs.get(view.getTagInternal());
     checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
 
-    //--- sideInput window
+    // --- sideInput window
     final BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(window);
 
-    //--- match the appropriate sideInput window.
+    // --- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.
     // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
     Iterable<WindowedValue<KV<?, ?>>> availableSideInputs =
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index d166d8dabbba..7b72b546e2ef 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -46,16 +46,10 @@ public InMemoryMetrics(
     // this might fail in case we have multiple aggregators with the same suffix after
     // the last dot, but it should be good enough for tests.
     if (extendedMetricsRegistry != null
-        && extendedMetricsRegistry
-            .getGauges()
-            .keySet()
-            .stream()
+        && extendedMetricsRegistry.getGauges().keySet().stream()
             .anyMatch(Predicates.containsPattern(name + "$")::apply)) {
       String key =
-          extendedMetricsRegistry
-              .getGauges()
-              .keySet()
-              .stream()
+          extendedMetricsRegistry.getGauges().keySet().stream()
               .filter(Predicates.containsPattern(name + "$")::apply)
               .findFirst()
               .get();
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index ba204a97f0d0..ceb87a17b30b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -133,9 +133,8 @@ private static void produce(Map<String, Instant> messages) {
     Serializer<String> stringSerializer = new StringSerializer();
     Serializer<Instant> instantSerializer = new InstantSerializer();
 
-    try (
-        KafkaProducer<String, Instant> kafkaProducer =
-            new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
+    try (KafkaProducer<String, Instant> kafkaProducer =
+        new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
       for (Map.Entry<String, Instant> en : messages.entrySet()) {
         kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
       }
@@ -179,12 +178,12 @@ public void testWithResume() throws Exception {
                 "EOFShallNotPassFn",
                 4L)));
 
-    //--- between executions:
+    // --- between executions:
 
-    //- clear state.
+    // - clear state.
     clean();
 
-    //- write a bit more.
+    // - write a bit more.
     produce(
         ImmutableMap.of(
             "k5", new Instant(499),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 05bdee080ea1..77dedb33511d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -40,7 +40,8 @@
 })
 
 /**
- * You can indicate a category for the experimental feature. This is unused and serves only as a hint to the reader.
+ * You can indicate a category for the experimental feature. This is unused and serves only as a
+ * hint to the reader.
  */
 @Documented
 public @interface Experimental {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index ad20d63c86ac..bdb971b06143 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -126,9 +126,7 @@ private void verifyDeterministic(Schema schema)
       throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
 
     List<Coder<?>> coders =
-        schema
-            .getFields()
-            .stream()
+        schema.getFields().stream()
             .map(Field::getType)
             .map(RowCoder::coderForFieldType)
             .collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index fb786f5caa55..1a5459229d79 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -944,8 +944,7 @@ private CreateTextSourceFn(byte[] delimiter) {
       checkArgument(
           1
               == Iterables.size(
-                  allToArgs
-                      .stream()
+                  allToArgs.stream()
                       .filter(Predicates.notNull()::apply)
                       .collect(Collectors.toList())),
           "Exactly one of filename policy, dynamic destinations, filename prefix, or destination "
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index b712ab46f917..1629b7a3283a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -354,8 +354,7 @@ static boolean printHelpUsageAndExitIfNeeded(
       } catch (ClassNotFoundException e) {
         // If we didn't find an exact match, look for any that match the class name.
         Iterable<Class<? extends PipelineOptions>> matches =
-            getRegisteredOptions()
-                .stream()
+            getRegisteredOptions().stream()
                 .filter(
                     input -> {
                       if (helpOption.contains(".")) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 9b76a4cf4d8d..df2df863b24d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -746,8 +746,7 @@ public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt)
       if (rawOptionsNode != null && !rawOptionsNode.isNull()) {
         ObjectNode optionsNode = (ObjectNode) rawOptionsNode;
         for (Iterator<Map.Entry<String, JsonNode>> iterator = optionsNode.fields();
-            iterator != null && iterator.hasNext();
-            ) {
+            iterator != null && iterator.hasNext(); ) {
           Map.Entry<String, JsonNode> field = iterator.next();
           fields.put(field.getKey(), field.getValue());
         }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
index 4df0a9313ca2..91659c814f4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
@@ -42,8 +42,7 @@
     public List<FieldValueTypeInformation> get(Class<?> clazz) {
       // If the generated class is passed in, we want to look at the base class to find the getters.
       Class<?> targetClass = AutoValueUtils.getBaseAutoValueClass(clazz);
-      return ReflectUtils.getMethods(targetClass)
-          .stream()
+      return ReflectUtils.getMethods(targetClass).stream()
           .filter(ReflectUtils::isGetter)
           // All AutoValue getters are marked abstract.
           .filter(m -> Modifier.isAbstract(m.getModifiers()))
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index d2ee4fdec11d..d54aaab25680 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -263,17 +263,13 @@ private FieldAccessDescriptor resolvedNestedFieldsHelper(
     }
 
     nestedFields.putAll(
-        getNestedFieldsAccessedByName()
-            .entrySet()
-            .stream()
+        getNestedFieldsAccessedByName().entrySet().stream()
             .collect(
                 Collectors.toMap(
                     e -> schema.indexOf(e.getKey()),
                     e -> resolvedNestedFieldsHelper(schema.getField(e.getKey()), e.getValue()))));
     nestedFields.putAll(
-        getNestedFieldsAccessedById()
-            .entrySet()
-            .stream()
+        getNestedFieldsAccessedById().entrySet().stream()
             .collect(
                 Collectors.toMap(
                     e -> validateFieldId(schema, e.getKey()),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
index 5ff50460b800..881b1cec75f1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
@@ -30,6 +30,7 @@
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiMap;
 import org.joda.time.Instant;
+
 /**
  * Utilities for converting between {@link Schema} field types and {@link TypeDescriptor}s that
  * define Java objects which can represent these field types.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 61fca2ecb262..176f97dc1f14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -53,8 +53,7 @@
 
     @Override
     public List<FieldValueTypeInformation> get(Class<?> clazz) {
-      return ReflectUtils.getMethods(clazz)
-          .stream()
+      return ReflectUtils.getMethods(clazz).stream()
           .filter(ReflectUtils::isGetter)
           .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
           .map(FieldValueTypeInformation::forGetter)
@@ -74,8 +73,7 @@
 
     @Override
     public List<FieldValueTypeInformation> get(Class<?> clazz) {
-      return ReflectUtils.getMethods(clazz)
-          .stream()
+      return ReflectUtils.getMethods(clazz).stream()
           .filter(ReflectUtils::isSetter)
           .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
           .map(FieldValueTypeInformation::forSetter)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 88986d4875c4..cba5448ca22b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -57,8 +57,7 @@
     @Override
     public List<FieldValueTypeInformation> get(Class<?> clazz) {
       List<FieldValueTypeInformation> types =
-          ReflectUtils.getFields(clazz)
-              .stream()
+          ReflectUtils.getFields(clazz).stream()
               .filter(f -> !f.isAnnotationPresent(SchemaIgnore.class))
               .map(FieldValueTypeInformation::forField)
               .map(
@@ -73,8 +72,7 @@
       if (ReflectUtils.getAnnotatedCreateMethod(clazz) == null
           && ReflectUtils.getAnnotatedConstructor(clazz) == null) {
         Optional<Field> finalField =
-            types
-                .stream()
+            types.stream()
                 .map(FieldValueTypeInformation::getField)
                 .filter(f -> Modifier.isFinal(f.getModifiers()))
                 .findAny();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 5592d1385beb..84e89d6be55a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -256,14 +256,11 @@ private boolean equivalent(Schema other, EquivalenceNullablePolicy nullablePolic
     }
 
     List<Field> otherFields =
-        other
-            .getFields()
-            .stream()
+        other.getFields().stream()
             .sorted(Comparator.comparing(Field::getName))
             .collect(Collectors.toList());
     List<Field> actualFields =
-        getFields()
-            .stream()
+        getFields().stream()
             .sorted(Comparator.comparing(Field::getName))
             .collect(Collectors.toList());
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
index 1059aa16775e..024131efe44c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
@@ -272,8 +272,7 @@ public void verifyCompatibility(Schema inputSchema) {
 
     if (!errors.isEmpty()) {
       String reason =
-          errors
-              .stream()
+          errors.stream()
               .map(x -> Joiner.on('.').join(x.path()) + ": " + x.message())
               .collect(Collectors.joining("\n\t"));
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
index 5804d4116248..1ee6fadf756c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -242,10 +242,7 @@ private FieldAccessDescriptor getFieldAccessDescriptor(TupleTag<?> tag) {
       KeyedPCollectionTuple<Row> keyedPCollectionTuple =
           KeyedPCollectionTuple.empty(input.getPipeline());
       List<TupleTag<Row>> sortedTags =
-          input
-              .getAll()
-              .keySet()
-              .stream()
+          input.getAll().keySet().stream()
               .sorted(Comparator.comparing(TupleTag::getId))
               .map(t -> new TupleTag<Row>(t.getId() + "_ROW"))
               .collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
index c7716855efd9..5f5626110226 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
@@ -125,9 +125,7 @@
       for (String fieldName :
           Sets.union(
               fieldNameFilters.keySet(),
-              fieldNamesFilters
-                  .keySet()
-                  .stream()
+              fieldNamesFilters.keySet().stream()
                   .flatMap(List::stream)
                   .collect(Collectors.toSet()))) {
         schema.getField(fieldName);
@@ -135,9 +133,7 @@
       for (int fieldIndex :
           Sets.union(
               fieldIdFilters.keySet(),
-              fieldIdsFilters
-                  .keySet()
-                  .stream()
+              fieldIdsFilters.keySet().stream()
                   .flatMap(List::stream)
                   .collect(Collectors.toSet()))) {
         if (fieldIndex >= schema.getFieldCount() || fieldIndex < 0) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
index 2fe8698740fb..6462e980026e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
@@ -145,8 +145,7 @@
     /** Once the schema is known, this function is called by the {@link Group} transform. */
     Inner<T> withSchema(Schema inputSchema, SerializableFunction<T, Row> toRowFunction) {
       List<FieldAggregation> fieldAggregations =
-          getFieldAggregations()
-              .stream()
+          getFieldAggregations().stream()
               .map(f -> f.resolve(inputSchema))
               .collect(Collectors.toList());
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
index 65f398b37ba2..a94e77e3ef35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
@@ -30,6 +30,7 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+
 /**
  * A {@link PTransform} to unnest nested rows.
  *
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
index c5c29418ba53..0dec26095343 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
@@ -125,8 +125,7 @@ private static boolean matchConstructor(
     }
 
     Map<String, FieldValueTypeInformation> typeMap =
-        getterTypes
-            .stream()
+        getterTypes.stream()
             .collect(
                 Collectors.toMap(
                     f -> ReflectUtils.stripGetterPrefix(f.getMethod().getName()),
@@ -151,8 +150,7 @@ public static SchemaUserTypeCreator getBuilderCreator(
     }
 
     Map<String, FieldValueTypeInformation> setterTypes =
-        ReflectUtils.getMethods(builderClass)
-            .stream()
+        ReflectUtils.getMethods(builderClass).stream()
             .filter(ReflectUtils::isSetter)
             .map(FieldValueTypeInformation::forSetter)
             .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
@@ -176,8 +174,7 @@ public static SchemaUserTypeCreator getBuilderCreator(
     }
 
     Method buildMethod =
-        ReflectUtils.getMethods(builderClass)
-            .stream()
+        ReflectUtils.getMethods(builderClass).stream()
             .filter(m -> m.getName().equals("build"))
             .findAny()
             .orElseThrow(() -> new RuntimeException("No build method in builder"));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 84278bef0789..39d6323209f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -88,8 +88,7 @@
 
         // don't need recursion because nested unions aren't supported in AVRO
         List<org.apache.avro.Schema> nonNullTypes =
-            types
-                .stream()
+            types.stream()
                 .filter(x -> x.getType() != org.apache.avro.Schema.Type.NULL)
                 .collect(Collectors.toList());
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index a435d45e4fb9..8aaa200ba4e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -68,8 +68,7 @@ public static Schema schemaFromJavaBeanClass(
   public static void validateJavaBean(
       List<FieldValueTypeInformation> getters, List<FieldValueTypeInformation> setters) {
     Map<String, FieldValueTypeInformation> setterMap =
-        setters
-            .stream()
+        setters.stream()
             .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
 
     for (FieldValueTypeInformation type : getters) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index 5d62e82dec27..073ead1e0bd1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -39,12 +39,9 @@
   public static List<FieldValueTypeInformation> sortBySchema(
       List<FieldValueTypeInformation> types, Schema schema) {
     Map<String, FieldValueTypeInformation> typeMap =
-        types
-            .stream()
+        types.stream()
             .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
-    return schema
-        .getFields()
-        .stream()
+    return schema.getFields().stream()
         .map(f -> typeMap.get(f.getName()))
         .collect(Collectors.toList());
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 2cd71f5907d8..3f8b22be7358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -332,8 +332,7 @@ public void finishBundle() throws Exception {
   /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
   @Deprecated
   public List<OutputT> peekOutputElements() {
-    return peekOutputElementsWithTimestamp()
-        .stream()
+    return peekOutputElementsWithTimestamp().stream()
         .map(TimestampedValue::getValue)
         .collect(Collectors.toList());
   }
@@ -342,8 +341,7 @@ public void finishBundle() throws Exception {
   @Deprecated
   public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
     // TODO: Should we return an unmodifiable list?
-    return getImmutableOutput(mainOutputTag)
-        .stream()
+    return getImmutableOutput(mainOutputTag).stream()
         .map(input -> TimestampedValue.of(input.getValue(), input.getTimestamp()))
         .collect(Collectors.toList());
   }
@@ -394,8 +392,7 @@ public void clearOutputElements() {
   @Deprecated
   public <T> List<T> peekOutputElements(TupleTag<T> tag) {
     // TODO: Should we return an unmodifiable list?
-    return getImmutableOutput(tag)
-        .stream()
+    return getImmutableOutput(tag).stream()
         .map(ValueInSingleWindow::getValue)
         .collect(Collectors.toList());
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 19ff713ef0e8..8c2a11bdaa9e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -173,7 +173,7 @@ public void processElement(
       numElementsInBatch.add(1L);
       Long num = numElementsInBatch.read();
       if (num % prefetchFrequency == 0) {
-        //prefetch data and modify batch state (readLater() modifies this)
+        // prefetch data and modify batch state (readLater() modifies this)
         batch.readLater();
       }
       if (num >= batchSize) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 4c47ba98e848..878c970ae841 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -679,8 +679,7 @@ static ProcessElementMethod create(
      * each scoped to a single window.
      */
     public boolean observesWindow() {
-      return extraParameters()
-          .stream()
+      return extraParameters().stream()
           .anyMatch(
               Predicates.or(
                       Predicates.instanceOf(WindowParameter.class),
@@ -696,8 +695,7 @@ public boolean observesWindow() {
     @Nullable
     public RowParameter getRowParameter() {
       Optional<Parameter> parameter =
-          extraParameters()
-              .stream()
+          extraParameters().stream()
               .filter(Predicates.instanceOf(RowParameter.class)::apply)
               .findFirst();
       return parameter.isPresent() ? ((RowParameter) parameter.get()) : null;
@@ -707,8 +705,7 @@ public RowParameter getRowParameter() {
     @Nullable
     public OutputReceiverParameter getMainOutputReceiver() {
       Optional<Parameter> parameter =
-          extraParameters()
-              .stream()
+          extraParameters().stream()
               .filter(Predicates.instanceOf(OutputReceiverParameter.class)::apply)
               .findFirst();
       return parameter.isPresent() ? ((OutputReceiverParameter) parameter.get()) : null;
@@ -718,8 +715,7 @@ public OutputReceiverParameter getMainOutputReceiver() {
      * Whether this {@link DoFn} is <a href="https://s.apache.org/splittable-do-fn">splittable</a>.
      */
     public boolean isSplittable() {
-      return extraParameters()
-          .stream()
+      return extraParameters().stream()
           .anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
     }
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 61089df347b9..33e6a7646c9f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -226,8 +226,7 @@ private MethodAnalysisContext() {}
 
     /** Indicates whether a {@link RestrictionTrackerParameter} is known in this context. */
     public boolean hasRestrictionTrackerParameter() {
-      return extraParameters
-          .stream()
+      return extraParameters.stream()
           .anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
     }
 
@@ -238,8 +237,7 @@ public boolean hasWindowParameter() {
 
     /** Indicates whether a {@link Parameter.PipelineOptionsParameter} is known in this context. */
     public boolean hasPipelineOptionsParamter() {
-      return extraParameters
-          .stream()
+      return extraParameters.stream()
           .anyMatch(Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)::apply);
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
index 3101732bba84..ea231c2c0a54 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
@@ -83,8 +83,7 @@ public FilePatternMatchingShardedFile(String filePattern) {
         Collection<Metadata> files = FileSystems.match(filePattern).metadata();
         LOG.debug(
             "Found file(s) {} by matching the path: {}",
-            files
-                .stream()
+            files.stream()
                 .map(Metadata::resourceId)
                 .map(ResourceId::getFilename)
                 .collect(Collectors.joining(",")),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index b5e724d5af3f..b359bb532e21 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -166,8 +166,7 @@ public static boolean isCancelled(CompletionStage<?> future) {
 
     return blockAndDiscard.thenApply(
         nothing ->
-            futures
-                .stream()
+            futures.stream()
                 .map(future -> future.toCompletableFuture().join())
                 .collect(Collectors.toList()));
   }
@@ -179,9 +178,8 @@ public static boolean isCancelled(CompletionStage<?> future) {
    * #allAsList(Collection)}.
    */
   @SuppressWarnings(
-    value = "NM_CLASS_NOT_EXCEPTION",
-    justification = "The class does hold an exception; its name is accurate."
-  )
+      value = "NM_CLASS_NOT_EXCEPTION",
+      justification = "The class does hold an exception; its name is accurate.")
   @AutoValue
   public abstract static class ExceptionOrResult<T> {
 
@@ -218,8 +216,7 @@ public static boolean isCancelled(CompletionStage<?> future) {
 
     return blockAndDiscard.thenApply(
         nothing ->
-            futures
-                .stream()
+            futures.stream()
                 .map(
                     future -> {
                       // The limited scope of the exceptions wrapped allows CancellationException
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
index a6f20a0b3b54..3b1805f1d206 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
@@ -145,10 +145,7 @@ private static Row jsonObjectToRow(FieldValue rowFieldValue) {
               + "can be parsed to Beam Rows");
     }
 
-    return rowFieldValue
-        .rowSchema()
-        .getFields()
-        .stream()
+    return rowFieldValue.rowSchema().getFields().stream()
         .map(
             schemaField ->
                 extractJsonNodeValue(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 65d37c82e54c..4bc8048135cc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -209,12 +209,12 @@ public void testTransientFieldInitialization() throws Exception {
     Pojo value = new Pojo("Hello", 42);
     AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
 
-    //Serialization of object
+    // Serialization of object
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     ObjectOutputStream out = new ObjectOutputStream(bos);
     out.writeObject(coder);
 
-    //De-serialization of object
+    // De-serialization of object
     ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
     ObjectInputStream in = new ObjectInputStream(bis);
     AvroCoder<Pojo> copied = (AvroCoder<Pojo>) in.readObject();
@@ -232,11 +232,11 @@ public void testKryoSerialization() throws Exception {
     Pojo value = new Pojo("Hello", 42);
     AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
 
-    //Kryo instantiation
+    // Kryo instantiation
     Kryo kryo = new Kryo();
     kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
 
-    //Serialization of object without any memoization
+    // Serialization of object without any memoization
     ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream();
     try (Output output = new Output(coderWithoutMemoizationBos)) {
       kryo.writeObject(output, coder);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 4e0621a6374e..1163b6e89bce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -293,8 +293,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
     String[] aElements =
         Iterables.toArray(
             StreamSupport.stream(
-                    elements
-                        .stream()
+                    elements.stream()
                         .filter(
                             Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())
                                 ::apply)
@@ -307,8 +306,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
     String[] bElements =
         Iterables.toArray(
             StreamSupport.stream(
-                    elements
-                        .stream()
+                    elements.stream()
                         .filter(
                             Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())
                                 ::apply)
@@ -321,8 +319,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
     String[] cElements =
         Iterables.toArray(
             StreamSupport.stream(
-                    elements
-                        .stream()
+                    elements.stream()
                         .filter(
                             Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())
                                 ::apply)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
index 131c1779109a..6d26ef570e54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
@@ -27,6 +27,7 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+
 /** test for {@link FieldTypeDescriptors}. */
 public class FieldTypeDescriptorsTest {
   @Test
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
index 5cdf56ddfafe..462fb05d9b19 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
@@ -34,6 +34,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+
 /** Tests for {@link org.apache.beam.sdk.schemas.transforms.Unnest}. */
 public class UnnestTest implements Serializable {
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
@@ -78,16 +79,14 @@ public void testSimpleUnnesting() {
             .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i, Integer.toString(i)).build())
             .collect(Collectors.toList());
     List<Row> rows =
-        bottomRow
-            .stream()
+        bottomRow.stream()
             .map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build())
             .collect(Collectors.toList());
     PCollection<Row> unnested =
         pipeline.apply(Create.of(rows).withRowSchema(NESTED_SCHEMA)).apply(Unnest.create());
     assertEquals(UNNESTED_SCHEMA, unnested.getSchema());
     List<Row> expected =
-        bottomRow
-            .stream()
+        bottomRow.stream()
             .map(
                 r ->
                     Row.withSchema(UNNESTED_SCHEMA)
@@ -116,8 +115,7 @@ public void testAlternateNamePolicy() {
             .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i, Integer.toString(i)).build())
             .collect(Collectors.toList());
     List<Row> rows =
-        bottomRow
-            .stream()
+        bottomRow.stream()
             .map(r -> Row.withSchema(NESTED_SCHEMA2).addValues(r).build())
             .collect(Collectors.toList());
     PCollection<Row> unnested =
@@ -126,8 +124,7 @@ public void testAlternateNamePolicy() {
             .apply(Unnest.<Row>create().withFieldNameFunction(Unnest.KEEP_NESTED_NAME));
     assertEquals(UNNESTED2_SCHEMA_ALTERNATE, unnested.getSchema());
     List<Row> expected =
-        bottomRow
-            .stream()
+        bottomRow.stream()
             .map(
                 r ->
                     Row.withSchema(UNNESTED2_SCHEMA_ALTERNATE)
@@ -148,8 +145,7 @@ public void testClashingNamePolicy() {
             .collect(Collectors.toList());
     thrown.expect(IllegalArgumentException.class);
     List<Row> rows =
-        bottomRow
-            .stream()
+        bottomRow.stream()
             .map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build())
             .collect(Collectors.toList());
     PCollection<Row> unnested =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index e753ba388840..8fd8f859f62e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -63,8 +63,10 @@ public void testFlatMapSimpleFunction() throws Exception {
         pipeline
             .apply(Create.of(1, 2, 3))
 
-            // Note that FlatMapElements takes an InferableFunction<InputT, ? extends Iterable<OutputT>>
-            // so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately exercises
+            // Note that FlatMapElements takes an InferableFunction<InputT, ? extends
+            // Iterable<OutputT>>
+            // so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately
+            // exercises
             // the use of an upper bound.
             .apply(
                 FlatMapElements.via(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index b1d00e6c5f98..a36010dbcdf2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -85,9 +85,7 @@ private void method(
 
     assertTrue(signature.isSplittable());
     assertTrue(
-        signature
-            .extraParameters()
-            .stream()
+        signature.extraParameters().stream()
             .anyMatch(
                 Predicates.instanceOf(DoFnSignature.Parameter.RestrictionTrackerParameter.class)
                     ::apply));
@@ -310,7 +308,8 @@ public CoderT getRestrictionCoder() {
     DoFnSignature signature =
         DoFnSignatures.getSignature(
             new GoodGenericSplittableDoFn<
-                SomeRestriction, RestrictionTracker<SomeRestriction, ?>,
+                SomeRestriction,
+                RestrictionTracker<SomeRestriction, ?>,
                 SomeRestrictionCoder>() {}.getClass());
     assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 9de0408efeae..a516b5987865 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -372,7 +372,8 @@ static String getRegionFromZone(String zone) {
               Transport.getJsonFactory(),
               chainHttpRequestInitializer(
                   credentials,
-                  // Do not log 404. It clutters the output and is possibly even required by the caller.
+                  // Do not log 404. It clutters the output and is possibly even required by the
+                  // caller.
                   new RetryHttpRequestInitializer(ImmutableList.of(404))))
           .setApplicationName(options.getAppName())
           .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 4e0066067dca..0e1b6bde641f 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -96,7 +96,8 @@ private static ApiComponents apiComponentsFromUrl(String urlString) {
                 getJsonFactory(),
                 chainHttpRequestInitializer(
                     options.getGcpCredential(),
-                    // Do not log the code 404. Code up the stack will deal with 404's if needed, and
+                    // Do not log the code 404. Code up the stack will deal with 404's if needed,
+                    // and
                     // logging it by default clutters the output during file staging.
                     new RetryHttpRequestInitializer(
                         ImmutableList.of(404), new UploadIdResponseInterceptor())))
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 51937ad95442..70867508e929 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -51,7 +51,7 @@ public static void main(String[] args) {
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
     Pipeline p = Pipeline.create(options);
 
-    //define the input row format
+    // define the input row format
     Schema type =
         Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
 
@@ -59,7 +59,7 @@ public static void main(String[] args) {
     Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
     Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();
 
-    //create a source PCollection with Create.of();
+    // create a source PCollection with Create.of();
     PCollection<Row> inputTable =
         PBegin.in(p)
             .apply(
@@ -67,7 +67,7 @@ public static void main(String[] args) {
                     .withSchema(
                         type, SerializableFunctions.identity(), SerializableFunctions.identity()));
 
-    //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+    // Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
     PCollection<Row> outputStream =
         inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
index f3327d857347..318d90df15e7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -75,10 +75,7 @@ private JdbcConnection(CalciteConnection connection) throws SQLException {
    * to a map of pipeline options.
    */
   private static Map<String, String> extractPipelineOptions(CalciteConnection calciteConnection) {
-    return calciteConnection
-        .getProperties()
-        .entrySet()
-        .stream()
+    return calciteConnection.getProperties().entrySet().stream()
         .map(entry -> KV.of(entry.getKey().toString(), entry.getValue().toString()))
         .filter(kv -> kv.getKey().startsWith(PIPELINE_OPTION_PREFIX))
         .map(kv -> KV.of(kv.getKey().substring(PIPELINE_OPTION_PREFIX.length()), kv.getValue()))
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
index cbdbbb6aa9c9..70aa89d3d6ee 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
@@ -54,7 +54,7 @@
         new FunctionParameter() {
           @Override
           public int getOrdinal() {
-            return 0; //up to one parameter is supported in UDAF.
+            return 0; // up to one parameter is supported in UDAF.
           }
 
           @Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 050aeec189d5..419cef17881e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -116,8 +116,7 @@ public RelWriter explainTerms(RelWriter pw) {
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     Schema outputSchema = CalciteUtils.toSchema(getRowType());
     List<FieldAggregation> aggregationAdapters =
-        getNamedAggCalls()
-            .stream()
+        getNamedAggCalls().stream()
             .map(aggCall -> new FieldAggregation(aggCall.getKey(), aggCall.getValue()))
             .collect(toList());
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 3a76a615f461..408922be836f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -284,16 +284,16 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
       case ARRAY:
         return ((List<?>) beamValue)
             .stream()
-            .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem))
-            .collect(Collectors.toList());
+                .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem))
+                .collect(Collectors.toList());
       case MAP:
         return ((Map<?, ?>) beamValue)
-            .entrySet()
-            .stream()
-            .collect(
-                Collectors.toMap(
-                    entry -> entry.getKey(),
-                    entry -> fieldToAvatica(type.getCollectionElementType(), entry.getValue())));
+            .entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        entry -> entry.getKey(),
+                        entry ->
+                            fieldToAvatica(type.getCollectionElementType(), entry.getValue())));
       case ROW:
         // TODO: needs to be a Struct
         return beamValue;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 5f060ec5567a..29e53d1a2e6c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -35,8 +35,7 @@
    * @return bounded if and only if all PCollection inputs are bounded
    */
   default PCollection.IsBounded isBounded() {
-    return getPCollectionInputs()
-            .stream()
+    return getPCollectionInputs().stream()
             .allMatch(
                 rel ->
                     BeamSqlRelUtils.getBeamRelInput(rel).isBounded()
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 760e37bf1ed2..bdc95724240d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -157,7 +157,8 @@ public int getCount() {
       PCollection<Row> upstream = pinput.get(0);
 
       // There is a need to separate ORDER BY LIMIT and LIMIT:
-      //  - GroupByKey (used in Top) is not allowed on unbounded data in global window so ORDER BY ... LIMIT
+      //  - GroupByKey (used in Top) is not allowed on unbounded data in global window so ORDER BY
+      // ... LIMIT
       //    works only on bounded data.
       //  - Just LIMIT operates on unbounded data, but across windows.
       if (fieldIndices.isEmpty()) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
index d48a6979e92a..9f0fd5547b32 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -43,8 +43,7 @@
       return PCollectionList.empty(pipeline);
     } else {
       return PCollectionList.of(
-          inputRels
-              .stream()
+          inputRels.stream()
               .map(input -> BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, cache))
               .collect(Collectors.toList()));
     }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
index 798f0e6e651c..beb27fc68a1a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -92,8 +92,8 @@ public static String beamRow2CsvLine(Row row, CSVFormat csvFormat) {
 
   /**
    * Attempt to cast an object to a specified Schema.Field.Type.
-   * @throws IllegalArgumentException if the value cannot be cast to that type.
    *
+   * @throws IllegalArgumentException if the value cannot be cast to that type.
    * @return The casted object in Schema.Field.Type.
    */
   public static Object autoCastField(Schema.Field field, Object rawObj) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
index 735315224616..1b0532c179a5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
@@ -86,7 +86,8 @@ public void processElement(ProcessContext ctx) {
 
               // Say for Row R, there are m instances on left and n instances on right,
               // INTERSECT ALL outputs MIN(m, n) instances of R.
-              Iterator<Row> iter = (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator();
+              Iterator<Row> iter =
+                  (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator();
               while (iter.hasNext()) {
                 ctx.output(iter.next());
               }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
index e338ea9cd9d5..195ac9604a48 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
@@ -27,8 +27,7 @@
 public abstract class BeamBuiltinFunctionProvider {
   public Map<String, List<Method>> getBuiltinMethods() {
     List<Method> methods = Arrays.asList(getClass().getMethods());
-    return methods
-        .stream()
+    return methods.stream()
         .filter(BeamBuiltinFunctionProvider::isUDF)
         .collect(
             Collectors.groupingBy(method -> method.getDeclaredAnnotation(UDF.class).funcName()));
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
index 0919ea72a549..97d3ffc76d29 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
@@ -35,10 +35,9 @@
   // return null for boolean is not allowed.
   // TODO: handle null input.
   @UDF(
-    funcName = "ENDS_WITH",
-    parameterArray = {TypeName.STRING},
-    returnType = TypeName.STRING
-  )
+      funcName = "ENDS_WITH",
+      parameterArray = {TypeName.STRING},
+      returnType = TypeName.STRING)
   public Boolean endsWith(String str1, String str2) {
     return str1.endsWith(str2);
   }
@@ -47,19 +46,17 @@ public Boolean endsWith(String str1, String str2) {
   // return null for boolean is not allowed.
   // TODO: handle null input.
   @UDF(
-    funcName = "STARTS_WITH",
-    parameterArray = {TypeName.STRING},
-    returnType = TypeName.STRING
-  )
+      funcName = "STARTS_WITH",
+      parameterArray = {TypeName.STRING},
+      returnType = TypeName.STRING)
   public Boolean startsWith(String str1, String str2) {
     return str1.startsWith(str2);
   }
 
   @UDF(
-    funcName = "LENGTH",
-    parameterArray = {TypeName.STRING},
-    returnType = TypeName.INT64
-  )
+      funcName = "LENGTH",
+      parameterArray = {TypeName.STRING},
+      returnType = TypeName.INT64)
   public Long lengthString(String str) {
     if (str == null) {
       return null;
@@ -68,10 +65,9 @@ public Long lengthString(String str) {
   }
 
   @UDF(
-    funcName = "LENGTH",
-    parameterArray = {TypeName.BYTES},
-    returnType = TypeName.INT64
-  )
+      funcName = "LENGTH",
+      parameterArray = {TypeName.BYTES},
+      returnType = TypeName.INT64)
   public Long lengthBytes(byte[] bytes) {
     if (bytes == null) {
       return null;
@@ -80,10 +76,9 @@ public Long lengthBytes(byte[] bytes) {
   }
 
   @UDF(
-    funcName = "REVERSE",
-    parameterArray = {TypeName.STRING},
-    returnType = TypeName.STRING
-  )
+      funcName = "REVERSE",
+      parameterArray = {TypeName.STRING},
+      returnType = TypeName.STRING)
   public String reverseString(String str) {
     if (str == null) {
       return null;
@@ -92,10 +87,9 @@ public String reverseString(String str) {
   }
 
   @UDF(
-    funcName = "REVERSE",
-    parameterArray = {TypeName.BYTES},
-    returnType = TypeName.BYTES
-  )
+      funcName = "REVERSE",
+      parameterArray = {TypeName.BYTES},
+      returnType = TypeName.BYTES)
   public byte[] reverseBytes(byte[] bytes) {
     if (bytes == null) {
       return null;
@@ -106,10 +100,9 @@ public String reverseString(String str) {
   }
 
   @UDF(
-    funcName = "FROM_HEX",
-    parameterArray = {TypeName.STRING},
-    returnType = TypeName.BYTES
-  )
+      funcName = "FROM_HEX",
+      parameterArray = {TypeName.STRING},
+      returnType = TypeName.BYTES)
   public byte[] fromHex(String str) {
     if (str == null) {
       return null;
@@ -123,10 +116,9 @@ public String reverseString(String str) {
   }
 
   @UDF(
-    funcName = "TO_HEX",
-    parameterArray = {TypeName.BYTES},
-    returnType = TypeName.STRING
-  )
+      funcName = "TO_HEX",
+      parameterArray = {TypeName.BYTES},
+      returnType = TypeName.STRING)
   public String toHex(byte[] bytes) {
     if (bytes == null) {
       return null;
@@ -136,19 +128,17 @@ public String toHex(byte[] bytes) {
   }
 
   @UDF(
-    funcName = "LPAD",
-    parameterArray = {TypeName.STRING, TypeName.INT64},
-    returnType = TypeName.STRING
-  )
+      funcName = "LPAD",
+      parameterArray = {TypeName.STRING, TypeName.INT64},
+      returnType = TypeName.STRING)
   public String lpad(String originalValue, Long returnLength) {
     return lpad(originalValue, returnLength, " ");
   }
 
   @UDF(
-    funcName = "LPAD",
-    parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
-    returnType = TypeName.STRING
-  )
+      funcName = "LPAD",
+      parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
+      returnType = TypeName.STRING)
   public String lpad(String originalValue, Long returnLength, String pattern) {
     if (originalValue == null || returnLength == null || pattern == null) {
       return null;
@@ -169,19 +159,17 @@ public String lpad(String originalValue, Long returnLength, String pattern) {
   }
 
   @UDF(
-    funcName = "LPAD",
-    parameterArray = {TypeName.BYTES, TypeName.INT64},
-    returnType = TypeName.BYTES
-  )
+      funcName = "LPAD",
+      parameterArray = {TypeName.BYTES, TypeName.INT64},
+      returnType = TypeName.BYTES)
   public byte[] lpad(byte[] originalValue, Long returnLength) {
     return lpad(originalValue, returnLength, " ".getBytes(UTF_8));
   }
 
   @UDF(
-    funcName = "LPAD",
-    parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
-    returnType = TypeName.BYTES
-  )
+      funcName = "LPAD",
+      parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
+      returnType = TypeName.BYTES)
   public byte[] lpad(byte[] originalValue, Long returnLength, byte[] pattern) {
     if (originalValue == null || returnLength == null || pattern == null) {
       return null;
@@ -214,19 +202,17 @@ public String lpad(String originalValue, Long returnLength, String pattern) {
   }
 
   @UDF(
-    funcName = "RPAD",
-    parameterArray = {TypeName.STRING, TypeName.INT64},
-    returnType = TypeName.STRING
-  )
+      funcName = "RPAD",
+      parameterArray = {TypeName.STRING, TypeName.INT64},
+      returnType = TypeName.STRING)
   public String rpad(String originalValue, Long returnLength) {
     return lpad(originalValue, returnLength, " ");
   }
 
   @UDF(
-    funcName = "RPAD",
-    parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
-    returnType = TypeName.STRING
-  )
+      funcName = "RPAD",
+      parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
+      returnType = TypeName.STRING)
   public String rpad(String originalValue, Long returnLength, String pattern) {
     if (originalValue == null || returnLength == null || pattern == null) {
       return null;
@@ -247,19 +233,17 @@ public String rpad(String originalValue, Long returnLength, String pattern) {
   }
 
   @UDF(
-    funcName = "RPAD",
-    parameterArray = {TypeName.BYTES, TypeName.INT64},
-    returnType = TypeName.BYTES
-  )
+      funcName = "RPAD",
+      parameterArray = {TypeName.BYTES, TypeName.INT64},
+      returnType = TypeName.BYTES)
   public byte[] rpad(byte[] originalValue, Long returnLength) {
     return lpad(originalValue, returnLength, " ".getBytes(UTF_8));
   }
 
   @UDF(
-    funcName = "RPAD",
-    parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
-    returnType = TypeName.BYTES
-  )
+      funcName = "RPAD",
+      parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
+      returnType = TypeName.BYTES)
   public byte[] rpad(byte[] originalValue, Long returnLength, byte[] pattern) {
     if (originalValue == null || returnLength == null || pattern == null) {
       return null;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
index 9049284d4db7..9a5f8ab1a550 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
@@ -31,10 +31,9 @@
    */
   // TODO: handle overflow
   @UDF(
-    funcName = "COSH",
-    parameterArray = {Schema.TypeName.DOUBLE},
-    returnType = Schema.TypeName.DOUBLE
-  )
+      funcName = "COSH",
+      parameterArray = {Schema.TypeName.DOUBLE},
+      returnType = Schema.TypeName.DOUBLE)
   public Double cosh(Double o) {
     if (o == null) {
       return null;
@@ -49,10 +48,9 @@ public Double cosh(Double o) {
    */
   // TODO: handle overflow
   @UDF(
-    funcName = "SINH",
-    parameterArray = {Schema.TypeName.DOUBLE},
-    returnType = Schema.TypeName.DOUBLE
-  )
+      funcName = "SINH",
+      parameterArray = {Schema.TypeName.DOUBLE},
+      returnType = Schema.TypeName.DOUBLE)
   public Double sinh(Double o) {
     if (o == null) {
       return null;
@@ -66,10 +64,9 @@ public Double sinh(Double o) {
    * <p>Computes hyperbolic tangent of X. Does not fail.
    */
   @UDF(
-    funcName = "TANH",
-    parameterArray = {Schema.TypeName.DOUBLE},
-    returnType = Schema.TypeName.DOUBLE
-  )
+      funcName = "TANH",
+      parameterArray = {Schema.TypeName.DOUBLE},
+      returnType = Schema.TypeName.DOUBLE)
   public Double tanh(Double o) {
     if (o == null) {
       return null;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
index 9e13cc85630f..c441303d28ef 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
@@ -33,19 +33,17 @@
   private static final String SQL_FUNCTION_NAME = "IS_INF";
 
   @UDF(
-    funcName = SQL_FUNCTION_NAME,
-    parameterArray = {Schema.TypeName.DOUBLE},
-    returnType = Schema.TypeName.BOOLEAN
-  )
+      funcName = SQL_FUNCTION_NAME,
+      parameterArray = {Schema.TypeName.DOUBLE},
+      returnType = Schema.TypeName.BOOLEAN)
   public Boolean isInf(Double value) {
     return Double.isInfinite(value);
   }
 
   @UDF(
-    funcName = SQL_FUNCTION_NAME,
-    parameterArray = {Schema.TypeName.FLOAT},
-    returnType = Schema.TypeName.BOOLEAN
-  )
+      funcName = SQL_FUNCTION_NAME,
+      parameterArray = {Schema.TypeName.FLOAT},
+      returnType = Schema.TypeName.BOOLEAN)
   public Boolean isInf(Float value) {
     return Float.isInfinite(value);
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
index 6bc1d31a2a68..f2bee2c2ccae 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
@@ -32,19 +32,17 @@
   private static final String SQL_FUNCTION_NAME = "IS_NAN";
 
   @UDF(
-    funcName = SQL_FUNCTION_NAME,
-    parameterArray = {Schema.TypeName.FLOAT},
-    returnType = Schema.TypeName.BOOLEAN
-  )
+      funcName = SQL_FUNCTION_NAME,
+      parameterArray = {Schema.TypeName.FLOAT},
+      returnType = Schema.TypeName.BOOLEAN)
   public Boolean isNan(Float value) {
     return Float.isNaN(value);
   }
 
   @UDF(
-    funcName = SQL_FUNCTION_NAME,
-    parameterArray = {Schema.TypeName.DOUBLE},
-    returnType = Schema.TypeName.BOOLEAN
-  )
+      funcName = SQL_FUNCTION_NAME,
+      parameterArray = {Schema.TypeName.DOUBLE},
+      returnType = Schema.TypeName.BOOLEAN)
   public Boolean isNan(Double value) {
     return Double.isNaN(value);
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index b227f88967c7..cbc23f0dd013 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -202,7 +202,7 @@ private static RelDataType toRelDataType(
    * @return
    */
   public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) {
-    //For Joda time types, return SQL type for java.util.Date.
+    // For Joda time types, return SQL type for java.util.Date.
     if (rawType instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) rawType)) {
       return typeFactory.createJavaType(Date.class);
     } else if (rawType instanceof Class && ByteString.class.isAssignableFrom((Class<?>) rawType)) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
index 1bb8dee3ba43..d7703808fc6a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
@@ -95,9 +95,7 @@ public void processElement(ProcessContext context) {
    * payload, and attributes.
    */
   private List<Object> getFieldValues(ProcessContext context) {
-    return messageSchema()
-        .getFields()
-        .stream()
+    return messageSchema().getFields().stream()
         .map(field -> getValueForField(field, context.timestamp(), context.element()))
         .collect(toList());
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 146f5cf8fa8c..3689ba831c50 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -82,9 +82,7 @@ public void dropTable(String tableName) {
 
   @Override
   public Map<String, Table> getTables() {
-    return tables()
-        .entrySet()
-        .stream()
+    return tables().entrySet().stream()
         .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table));
   }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
index 9ba719332956..d1336002aad5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
@@ -75,8 +75,7 @@ public static Schema buildBeamSqlSchema(Object... args) {
    * }</pre>
    */
   public static List<Row> buildRows(Schema type, List<?> rowsValues) {
-    return Lists.partition(rowsValues, type.getFieldCount())
-        .stream()
+    return Lists.partition(rowsValues, type.getFieldCount()).stream()
         .map(values -> values.stream().collect(toRow(type)))
         .collect(toList());
   }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 3c462952e6ad..28f3f752443d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -60,14 +60,14 @@
   static List<Row> rowsOfBytes;
   static List<Row> rowsOfBytesPaddingTest;
 
-  //bounded PCollections
+  // bounded PCollections
   protected PCollection<Row> boundedInput1;
   protected PCollection<Row> boundedInput2;
   protected PCollection<Row> boundedInputFloatDouble;
   protected PCollection<Row> boundedInputBytes;
   protected PCollection<Row> boundedInputBytesPaddingTest;
 
-  //unbounded PCollections
+  // unbounded PCollections
   protected PCollection<Row> unboundedInput1;
   protected PCollection<Row> unboundedInput2;
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
index 04ae5f17e6b8..f69f3b985a4b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
@@ -164,9 +164,7 @@ private static SqlOperatorId sqlOperatorId(SqlOperator sqlOperator) {
     Set<SqlOperatorId> declaredOperators = new HashSet<>();
 
     declaredOperators.addAll(
-        SqlStdOperatorTable.instance()
-            .getOperatorList()
-            .stream()
+        SqlStdOperatorTable.instance().getOperatorList().stream()
             .map(operator -> sqlOperatorId(operator.getName(), operator.getKind()))
             .collect(Collectors.toList()));
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 7582aa4d8f16..a7525d2c61ef 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -197,8 +197,7 @@ public void testSelectsFromExistingTable() throws Exception {
         connection.createStatement().executeQuery("SELECT id, name FROM person");
 
     List<Row> resultRows =
-        readResultSet(selectResult)
-            .stream()
+        readResultSet(selectResult).stream()
             .map(values -> values.stream().collect(toRow(BASIC_SCHEMA)))
             .collect(Collectors.toList());
 
@@ -322,8 +321,7 @@ public void testSelectsFromExistingComplexTable() throws Exception {
             .executeQuery("SELECT person.nestedRow.id, person.nestedRow.name FROM person");
 
     List<Row> resultRows =
-        readResultSet(selectResult)
-            .stream()
+        readResultSet(selectResult).stream()
             .map(values -> values.stream().collect(toRow(BASIC_SCHEMA)))
             .collect(Collectors.toList());
 
@@ -350,8 +348,7 @@ public void testInsertIntoCreatedTable() throws Exception {
         connection.createStatement().executeQuery("SELECT id, name FROM person");
 
     List<Row> resultRows =
-        readResultSet(selectResult)
-            .stream()
+        readResultSet(selectResult).stream()
             .map(resultValues -> resultValues.stream().collect(toRow(BASIC_SCHEMA)))
             .collect(Collectors.toList());
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
index acc7633fa4f6..39b5f0df014b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
@@ -119,10 +119,7 @@ private String unparseMap(FieldType fieldType) {
 
   private String unparseRow(FieldType fieldType) {
     return "ROW<"
-        + fieldType
-            .getRowSchema()
-            .getFields()
-            .stream()
+        + fieldType.getRowSchema().getFields().stream()
             .map(field -> field.getName() + " " + unparse(field.getType()))
             .collect(joining(","))
         + ">";
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
index 7eea6ece2896..02349a558c43 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
@@ -45,9 +45,7 @@ public void setUp() {
   Map<String, RelDataType> calciteRowTypeFields(Schema schema) {
     final RelDataType dataType = CalciteUtils.toCalciteRowType(schema, dataTypeFactory);
 
-    return dataType
-        .getFieldNames()
-        .stream()
+    return dataType.getFieldNames().stream()
         .collect(
             Collectors.toMap(
                 x -> x,
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index 062700ba0fe6..e8fa1358600d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -340,9 +340,8 @@ private CalciteConnection connect(PipelineOptions options, TableProvider... tabl
     // The actual options are in the "options" field of the converted map
     Map<String, String> argsMap =
         ((Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options"))
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
+            .entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
 
     InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
     for (TableProvider tableProvider : tableProviders) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index ef5c40eaf7b9..28fc4a406c46 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -71,7 +71,9 @@
 
   static class Factory<InputT, RestrictionT, OutputT>
       extends DoFnPTransformRunnerFactory<
-          KV<InputT, RestrictionT>, InputT, OutputT,
+          KV<InputT, RestrictionT>,
+          InputT,
+          OutputT,
           SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>> {
 
     @Override
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
index d52ad469ef01..32797fa9da3e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
@@ -246,7 +246,7 @@ public void testBasicInboundConsumerBehaviour() throws Exception {
   @Test(timeout = 10000)
   public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
     CountDownLatch waitForClientToConnect = new CountDownLatch(1);
-    //Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
+    // Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
     Collection<WindowedValue<String>> inboundValuesB = new ConcurrentLinkedQueue<>();
     Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
     AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver =
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
index 908e31c77f36..a1ac65ecec4a 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
@@ -241,10 +241,9 @@ public SSEAwsKeyManagementParams deserialize(JsonParser parser, DeserializationC
   }
 
   @JsonAutoDetect(
-    fieldVisibility = Visibility.NONE,
-    getterVisibility = Visibility.NONE,
-    setterVisibility = Visibility.NONE
-  )
+      fieldVisibility = Visibility.NONE,
+      getterVisibility = Visibility.NONE,
+      setterVisibility = Visibility.NONE)
   interface ClientConfigurationMixin {
     @JsonProperty
     String getProxyHost();
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 50a10a357a3c..1276a79c6ddf 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -596,8 +596,7 @@ protected void rename(
   @Override
   protected void delete(Collection<S3ResourceId> resourceIds) throws IOException {
     List<S3ResourceId> nonDirectoryPaths =
-        resourceIds
-            .stream()
+        resourceIds.stream()
             .filter(s3ResourceId -> !s3ResourceId.isDirectory())
             .collect(Collectors.toList());
     Multimap<String, String> keysByBucket = ArrayListMultimap.create();
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
index df706082d7db..449dd1e3af16 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
@@ -82,7 +82,7 @@
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public final class SnsIO {
 
-  //Write data tp SNS
+  // Write data tp SNS
   public static Write write() {
     return new AutoValue_SnsIO_Write.Builder().build();
   }
@@ -283,7 +283,7 @@ public PCollectionTuple expand(PCollection<PublishRequest> input) {
 
       @Setup
       public void setup() throws Exception {
-        //Initialize SnsPublisher
+        // Initialize SnsPublisher
         producer = spec.getAWSClientsProvider().createSnsPublisher();
         checkArgument(
             topicExists(producer, spec.getTopicName()),
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index db637a4a2afe..46640e39ab96 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -696,19 +696,19 @@ public void testWriteAndRead() throws IOException {
     ByteBuffer bb = ByteBuffer.allocate(writtenArray.length);
     bb.put(writtenArray);
 
-    //First create an object and write data to it
+    // First create an object and write data to it
     S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar.txt");
     WritableByteChannel writableByteChannel =
         s3FileSystem.create(path, builder().setMimeType("application/text").build());
     writableByteChannel.write(bb);
     writableByteChannel.close();
 
-    //Now read the same object
+    // Now read the same object
     ByteBuffer bb2 = ByteBuffer.allocate(writtenArray.length);
     ReadableByteChannel open = s3FileSystem.open(path);
     open.read(bb2);
 
-    //And compare the content with the one that was written
+    // And compare the content with the one that was written
     byte[] readArray = bb2.array();
     assertArrayEquals(readArray, writtenArray);
     open.close();
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index e648112820a1..ee31292fe4a9 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -214,21 +214,14 @@ static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
 
     SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
     List<BigInteger> tokens =
-        cluster
-            .getMetadata()
-            .getTokenRanges()
-            .stream()
+        cluster.getMetadata().getTokenRanges().stream()
             .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
             .collect(Collectors.toList());
     List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
     LOG.info("{} splits were actually generated", splits.size());
 
     final String partitionKey =
-        cluster
-            .getMetadata()
-            .getKeyspace(spec.keyspace())
-            .getTable(spec.table())
-            .getPartitionKey()
+        cluster.getMetadata().getKeyspace(spec.keyspace()).getTable(spec.table()).getPartitionKey()
             .stream()
             .map(ColumnMetadata::getName)
             .collect(Collectors.joining(","));
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index 0eaca4f58121..da5e54dd0389 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -366,9 +366,7 @@ private static void set(Properties properties, String param, Object value) {
     @VisibleForTesting
     static String insertSql(TableSchema schema, String table) {
       String columnsStr =
-          schema
-              .columns()
-              .stream()
+          schema.columns().stream()
               .filter(x -> !x.materializedOrAlias())
               .map(x -> quoteIdentifier(x.name()))
               .collect(Collectors.joining(", "));
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index b8b1e3bbb651..75692d2859b4 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -45,9 +45,7 @@ public static TableSchema of(Column... columns) {
    * @return Beam schema
    */
   public static Schema getEquivalentSchema(TableSchema tableSchema) {
-    return tableSchema
-        .columns()
-        .stream()
+    return tableSchema.columns().stream()
         .map(
             x -> {
               if (x.columnType().nullable()) {
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 55788b007bca..9a6da467928b 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -53,7 +53,7 @@
   private static Node node;
   private static RestClient restClient;
   private static ConnectionConfiguration connectionConfiguration;
-  //cannot use inheritance because ES5 test already extends ESIntegTestCase.
+  // cannot use inheritance because ES5 test already extends ESIntegTestCase.
   private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
 
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 24748b9e4637..806b93e307d0 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
   public Settings indexSettings() {
     return Settings.builder()
         .put(super.indexSettings())
-        //useful to have updated sizes for getEstimatedSize
+        // useful to have updated sizes for getEstimatedSize
         .put("index.store.stats_refresh_interval", 0)
         .build();
   }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d731f339424a..0579cfa115c0 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
   public Settings indexSettings() {
     return Settings.builder()
         .put(super.indexSettings())
-        //useful to have updated sizes for getEstimatedSize
+        // useful to have updated sizes for getEstimatedSize
         .put("index.store.stats_refresh_interval", 0)
         .build();
   }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 579c968adf46..129619eb8521 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -185,9 +185,9 @@ void testRead() throws Exception {
         pipeline.apply(
             ElasticsearchIO.read()
                 .withConnectionConfiguration(connectionConfiguration)
-                //set to default value, useful just to test parameter passing.
+                // set to default value, useful just to test parameter passing.
                 .withScrollKeepalive("5m")
-                //set to default value, useful just to test parameter passing.
+                // set to default value, useful just to test parameter passing.
                 .withBatchSize(100L));
     PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(numDocs);
     pipeline.run();
@@ -594,7 +594,8 @@ void testDefaultRetryPredicate(RestClient restClient) throws IOException {
    */
   void testWriteRetry() throws Throwable {
     expectedException.expectCause(isA(IOException.class));
-    // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and retry started.
+    // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and
+    // retry started.
     expectedException.expectMessage(
         String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, EXPECTED_RETRIES));
 
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 7c30563709ee..9956684c269b 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -188,7 +188,7 @@ static void checkForErrors(HttpEntity responseEntity, int backendVersion) throws
       StringBuilder errorMessages =
           new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
       JsonNode items = searchResult.path("items");
-      //some items present in bulk might have errors, concatenate error messages
+      // some items present in bulk might have errors, concatenate error messages
       for (JsonNode item : items) {
 
         String errorRootName = "";
@@ -572,7 +572,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
     @Nullable private final Integer numSlices;
     @Nullable private final Integer sliceId;
 
-    //constructor used in split() when we know the backend version
+    // constructor used in split() when we know the backend version
     private BoundedElasticsearchSource(
         Read spec,
         @Nullable String shardPreference,
@@ -727,7 +727,7 @@ public boolean start() throws IOException {
       if ((source.backendVersion == 5 || source.backendVersion == 6)
           && source.numSlices != null
           && source.numSlices > 1) {
-        //if there is more than one slice, add the slice to the user query
+        // if there is more than one slice, add the slice to the user query
         String sliceQuery =
             String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices);
         query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
@@ -777,7 +777,7 @@ public boolean advance() throws IOException {
     }
 
     private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
-      //stop if no more data
+      // stop if no more data
       JsonNode hits = searchResult.path("hits").path("hits");
       if (hits.size() == 0) {
         current = null;
@@ -1300,12 +1300,12 @@ private HttpEntity handleRetry(
         Sleeper sleeper = Sleeper.DEFAULT;
         BackOff backoff = retryBackoff.backoff();
         int attempt = 0;
-        //while retry policy exists
+        // while retry policy exists
         while (BackOffUtils.next(sleeper, backoff)) {
           LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
           response = restClient.performRequest(method, endpoint, params, requestBody);
           responseEntity = new BufferedHttpEntity(response.getEntity());
-          //if response has no 429 errors
+          // if response has no 429 errors
           if (!spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
             return responseEntity;
           }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b4dba1b78bb9..54378fcf3636 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1604,8 +1604,7 @@ public WriteResult expand(PCollection<T> input) {
       checkArgument(
           1
               == Iterables.size(
-                  allToArgs
-                      .stream()
+                  allToArgs.stream()
                       .filter(Predicates.notNull()::apply)
                       .collect(Collectors.toList())),
           "Exactly one of jsonTableRef, tableFunction, or " + "dynamicDestinations must be set");
@@ -1615,8 +1614,7 @@ public WriteResult expand(PCollection<T> input) {
       checkArgument(
           2
               > Iterables.size(
-                  allSchemaArgs
-                      .stream()
+                  allSchemaArgs.stream()
                       .filter(Predicates.notNull()::apply)
                       .collect(Collectors.toList())),
           "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + "be set");
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 00d5b570b824..a30465e61b9d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -914,7 +914,8 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Inte
             Transport.getJsonFactory(),
             chainHttpRequestInitializer(
                 options.getGcpCredential(),
-                // Do not log 404. It clutters the output and is possibly even required by the caller.
+                // Do not log 404. It clutters the output and is possibly even required by the
+                // caller.
                 httpRequestInitializer))
         .setApplicationName(options.getAppName())
         .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index e8fcf210a20b..b6d8b748c16e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -263,9 +263,7 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
             .collect(toMap(i -> bqFields.get(i).getName(), i -> i));
 
     List<Object> rawJsonValues =
-        rowSchema
-            .getFields()
-            .stream()
+        rowSchema.getFields().stream()
             .map(field -> bqFieldIndices.get(field.getName()))
             .map(index -> jsonBqRow.getF().get(index).getV())
             .collect(toList());
@@ -284,9 +282,9 @@ private static Object toBeamValue(FieldType fieldType, Object jsonBQValue) {
     if (jsonBQValue instanceof List) {
       return ((List<Object>) jsonBQValue)
           .stream()
-          .map(v -> ((Map<String, Object>) v).get("v"))
-          .map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
-          .collect(toList());
+              .map(v -> ((Map<String, Object>) v).get("v"))
+              .map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
+              .collect(toList());
     }
 
     throw new UnsupportedOperationException(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
index d96e8b3d0810..ed10c21ccb9d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
@@ -228,8 +228,7 @@ private void pollAndAssert(
       return Collections.emptyList();
     }
 
-    return bqRows
-        .stream()
+    return bqRows.stream()
         .map(bqRow -> toBeamRow(rowSchema, bqSchema, bqRow))
         .collect(Collectors.toList());
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 633355d8b24d..9277055ace31 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -75,7 +75,8 @@ public PubsubClient newClient(
                   Transport.getJsonFactory(),
                   chainHttpRequestInitializer(
                       options.getGcpCredential(),
-                      // Do not log 404. It clutters the output and is possibly even required by the caller.
+                      // Do not log 404. It clutters the output and is possibly even required by the
+                      // caller.
                       new RetryHttpRequestInitializer(ImmutableList.of(404))))
               .setRootUrl(options.getPubsubRootUrl())
               .setApplicationName(options.getAppName())
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index ead5cb555b14..d09745dcdfdd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -672,7 +672,8 @@ public void testInsertOtherRetry() throws Throwable {
     List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
-    // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload but should not
+    // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload
+    // but should not
     // be invoked.
     when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
     when(response.getStatusCode()).thenReturn(403).thenReturn(200);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
index d976366ac4b0..8d6dcff19d04 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
@@ -149,9 +149,7 @@ private void verifyLegacyQueryRes(String outputTable) throws Exception {
         BQ_CLIENT.queryWithRetries(String.format("SELECT fruit from [%s];", outputTable), project);
     LOG.info("Finished to query result table {}", outputTable);
     List<String> tableResult =
-        response
-            .getRows()
-            .stream()
+        response.getRows().stream()
             .flatMap(row -> row.getF().stream().map(cell -> cell.getV().toString()))
             .sorted()
             .collect(Collectors.toList());
@@ -171,9 +169,7 @@ private void verifyNewTypesQueryRes(String outputTable) throws Exception {
             String.format("SELECT bytes, date, time FROM [%s];", outputTable), project);
     LOG.info("Finished to query result table {}", outputTable);
     List<String> tableResult =
-        response
-            .getRows()
-            .stream()
+        response.getRows().stream()
             .map(
                 row -> {
                   String res = "";
@@ -234,13 +230,13 @@ private void verifyStandardQueryRes(String outputTable) throws Exception {
   @BeforeClass
   public static void setupTestEnvironment() throws Exception {
     PipelineOptionsFactory.register(BigQueryToTableOptions.class);
-		project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+    project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
     // Create one BQ dataset for all test cases.
     BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID);
 
     // Create table and insert data for new type query test cases.
     BQ_CLIENT.createNewTable(
-				project,
+        project,
         BIG_QUERY_DATASET_ID,
         new Table()
             .setSchema(BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_SCHEMA)
@@ -265,7 +261,7 @@ public static void cleanup() {
   @Test
   public void testLegacyQueryWithoutReshuffle() throws Exception {
     final String outputTable =
-				project + ":" + BIG_QUERY_DATASET_ID + "." + "testLegacyQueryWithoutReshuffle";
+        project + ":" + BIG_QUERY_DATASET_ID + "." + "testLegacyQueryWithoutReshuffle";
 
     this.runBigQueryToTablePipeline(setupLegacyQueryTest(outputTable));
 
@@ -275,7 +271,7 @@ public void testLegacyQueryWithoutReshuffle() throws Exception {
   @Test
   public void testNewTypesQueryWithoutReshuffle() throws Exception {
     final String outputTable =
-				project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffle";
+        project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffle";
 
     this.runBigQueryToTablePipeline(setupNewTypesQueryTest(outputTable));
 
@@ -285,7 +281,7 @@ public void testNewTypesQueryWithoutReshuffle() throws Exception {
   @Test
   public void testNewTypesQueryWithReshuffle() throws Exception {
     final String outputTable =
-				project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithReshuffle";
+        project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithReshuffle";
     BigQueryToTableOptions options = setupNewTypesQueryTest(outputTable);
     options.setReshuffle(true);
 
@@ -297,7 +293,7 @@ public void testNewTypesQueryWithReshuffle() throws Exception {
   @Test
   public void testStandardQueryWithoutCustom() throws Exception {
     final String outputTable =
-				project + ":" + BIG_QUERY_DATASET_ID + "." + "testStandardQueryWithoutCustom";
+        project + ":" + BIG_QUERY_DATASET_ID + "." + "testStandardQueryWithoutCustom";
 
     this.runBigQueryToTablePipeline(setupStandardQueryTest(outputTable));
 
@@ -308,7 +304,7 @@ public void testStandardQueryWithoutCustom() throws Exception {
   @Category(DataflowPortabilityApiUnsupported.class)
   public void testNewTypesQueryWithoutReshuffleWithCustom() throws Exception {
     final String outputTable =
-				project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffleWithCustom";
+        project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffleWithCustom";
     BigQueryToTableOptions options = this.setupNewTypesQueryTest(outputTable);
     options.setExperiments(
         ImmutableList.of("enable_custom_bigquery_sink", "enable_custom_bigquery_source"));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 8924733795fc..e995e7af969e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -581,8 +581,7 @@ public void testReadingWithFilter() throws Exception {
     String regex = ".*17.*";
     final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
     Iterable<Row> filteredRows =
-        testRows
-            .stream()
+        testRows.stream()
             .filter(
                 input -> {
                   verifyNotNull(input, "input");
@@ -710,7 +709,7 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception {
     makeTableData(table, numRows);
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
-    //Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9]
+    // Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9]
     List<ByteKeyRange> keyRanges =
         Arrays.asList(
             ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
@@ -720,7 +719,7 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception {
             ByteKeyRange.of(createByteKey(6), createByteKey(7)),
             ByteKeyRange.of(createByteKey(8), createByteKey(9)));
 
-    //Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9]
+    // Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9]
     List<ByteKeyRange> expectedKeyRangesAfterReducedSplits =
         Arrays.asList(
             ByteKeyRange.of(ByteKey.EMPTY, createByteKey(2)),
@@ -770,7 +769,7 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception {
     makeTableData(table, numRows);
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
-    //Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9]
+    // Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9]
     List<ByteKeyRange> keyRanges =
         Arrays.asList(
             ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
@@ -846,8 +845,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception {
       splits.add(source.withSingleRange(range));
     }
 
-    //Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
-    //expected reduced Split source ranges are [..4][4..8][8..]
+    // Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
+    // expected reduced Split source ranges are [..4][4..8][8..]
     List<ByteKeyRange> expectedKeyRangesAfterReducedSplits =
         Arrays.asList(
             ByteKeyRange.of(ByteKey.EMPTY, createByteKey(4)),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index ed885555bd1f..a3ed49deeb77 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -233,7 +233,7 @@ static long countEntities(V1TestOptions options, String project, String ancestor
     private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
     // Number of times to retry on update failure
     private static final int MAX_RETRIES = 5;
-    //Initial backoff time for exponential backoff for retry attempts.
+    // Initial backoff time for exponential backoff for retry attempts.
     private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(5);
 
     // Returns true if a Datastore key is complete. A key is complete if its last element
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 13de3b51355c..6fe51d6a95d7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -354,8 +354,7 @@ public void evaluate() throws Throwable {
 
   private <T> void setupTestClient(List<T> inputs, Coder<T> coder) {
     List<IncomingMessage> messages =
-        inputs
-            .stream()
+        inputs.stream()
             .map(
                 t -> {
                   try {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 3b92f140cf0a..5b4a7262a8b3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -530,9 +530,7 @@ public void testGatherBundleAndSortFn() throws Exception {
 
     // Verify sorted output... first decode it...
     List<MutationGroup> sorted =
-        byteArrayKvListCaptor
-            .getValue()
-            .stream()
+        byteArrayKvListCaptor.getValue().stream()
             .map(kv -> WriteGrouped.decode(kv.getValue()))
             .collect(Collectors.toList());
     assertThat(
@@ -586,8 +584,7 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception {
 
     // decode list of lists of KV to a list of lists of MutationGroup.
     List<List<MutationGroup>> mgListGroups =
-        kvGroups
-            .stream()
+        kvGroups.stream()
             .map(
                 l ->
                     l.stream()
@@ -625,8 +622,7 @@ public void testBatchFn_cells() throws Exception {
             g(m(2L)));
 
     List<KV<byte[], byte[]>> encodedInput =
-        mutationGroups
-            .stream()
+        mutationGroups.stream()
             .map(mg -> KV.of((byte[]) null, WriteGrouped.encode(mg)))
             .collect(Collectors.toList());
 
@@ -671,8 +667,7 @@ public void testBatchFn_size() throws Exception {
             g(m(2L)));
 
     List<KV<byte[], byte[]>> encodedInput =
-        mutationGroups
-            .stream()
+        mutationGroups.stream()
             .map(mg -> KV.of((byte[]) null, WriteGrouped.encode(mg)))
             .collect(Collectors.toList());
 
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index a85cfd08b730..0bd556f092f8 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -190,7 +190,8 @@ protected void rename(
       boolean success = fileSystem.rename(src, dest);
 
       // If the failure was due to the file already existing, delete and retry (BEAM-5036).
-      // This should be the exceptional case, so handle here rather than incur the overhead of testing first
+      // This should be the exceptional case, so handle here rather than incur the overhead of
+      // testing first
       if (!success && fileSystem.exists(src) && fileSystem.exists(dest)) {
         LOG.debug(
             String.format(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(dest)));
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
index e5a7bf37751a..5a73ea12416c 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
@@ -152,7 +152,7 @@ private boolean tryCreateFile(Configuration conf, Path path) {
       } catch (FileAlreadyExistsException | org.apache.hadoop.fs.FileAlreadyExistsException e) {
         return false;
       } catch (RemoteException e) {
-        //remote hdfs exception
+        // remote hdfs exception
         if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())) {
           return false;
         }
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
index 5cce85abd535..1d7fc32c203f 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -641,8 +641,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
           "Generated {} splits. Size of first split is {} ",
           inputSplits.size(),
           inputSplits.get(0).getSplit().getLength());
-      return inputSplits
-          .stream()
+      return inputSplits.stream()
           .map(
               serializableInputSplit -> {
                 return new HadoopInputFormatBoundedSource<>(
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
index 04eadeaaf468..4b88e32ece6b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
@@ -177,7 +177,7 @@ private static void createCassandraData() {
 
   @BeforeClass
   public static void startCassandra() throws Exception {
-    //Start the Embedded Cassandra Service
+    // Start the Embedded Cassandra Service
     cassandra.start();
     final SocketOptions socketOptions = new SocketOptions();
     // Setting this to 0 disables read timeouts.
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
index 1a19dd937a5d..509cc46c8980 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
@@ -93,8 +93,7 @@
               KV.of(new Text(element.getKey()), new LongWritable(element.getValue()));
 
   private static Map<String, Long> computeWordCounts(List<String> sentences) {
-    return sentences
-        .stream()
+    return sentences.stream()
         .flatMap(s -> Stream.of(s.split("\\W+")))
         .map(String::toLowerCase)
         .collect(Collectors.toMap(Function.identity(), s -> 1L, Long::sum));
@@ -298,8 +297,7 @@ public void streamTest() {
   }
 
   private Map<String, Long> loadWrittenDataAsMap(String outputDirPath) {
-    return loadWrittenData(outputDirPath)
-        .stream()
+    return loadWrittenData(outputDirPath).stream()
         .collect(
             Collectors.toMap(
                 kv -> kv.getKey().toString(),
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java
index a40ab46e8871..c938b7df7343 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java
@@ -24,7 +24,7 @@
 /** Properties needed when using HadoopFormatIO with the Beam SDK. */
 public interface HadoopFormatIOTestOptions extends TestPipelineOptions {
 
-  //Cassandra test options
+  // Cassandra test options
   @Description("Cassandra Server IP")
   @Default.String("cassandraServerIp")
   String getCassandraServerIp();
@@ -49,7 +49,7 @@
 
   void setCassandraPassword(String cassandraPassword);
 
-  //Elasticsearch test options
+  // Elasticsearch test options
   @Description("Elasticsearch Server IP")
   @Default.String("elasticServerIp")
   String getElasticServerIp();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
index 9d62f481add8..98db521b5324 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
@@ -68,11 +68,12 @@
   public static List<KV<Text, Employee>> getEmployeeData() {
     return (data.isEmpty() ? populateEmployeeData() : data)
         .stream()
-        .map(
-            input -> {
-              List<String> empData = Splitter.on('_').splitToList(input.getValue());
-              return KV.of(new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
-            })
-        .collect(Collectors.toList());
+            .map(
+                input -> {
+                  List<String> empData = Splitter.on('_').splitToList(input.getValue());
+                  return KV.of(
+                      new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
+                })
+            .collect(Collectors.toList());
   }
 }
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
index f2a1eae936d4..02732611b7d7 100644
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
+++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
@@ -175,7 +175,7 @@ private static void createCassandraData() {
 
   @BeforeClass
   public static void startCassandra() throws Exception {
-    //Start the Embedded Cassandra Service
+    // Start the Embedded Cassandra Service
     cassandra.start();
     final SocketOptions socketOptions = new SocketOptions();
     // Setting this to 0 disables read timeouts.
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
index 7c571b10df67..ade544fc22bd 100644
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
+++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
@@ -24,7 +24,7 @@
 /** Properties needed when using HadoopInputFormatIO with the Beam SDK. */
 public interface HIFITestOptions extends TestPipelineOptions {
 
-  //Cassandra test options
+  // Cassandra test options
   @Description("Cassandra Server IP")
   @Default.String("cassandraServerIp")
   String getCassandraServerIp();
@@ -49,7 +49,7 @@
 
   void setCassandraPassword(String cassandraPassword);
 
-  //Elasticsearch test options
+  // Elasticsearch test options
   @Description("Elasticsearch Server IP")
   @Default.String("elasticServerIp")
   String getElasticServerIp();
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
index 65aa5ee12ceb..6f91328f5d13 100644
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
+++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
@@ -68,11 +68,12 @@
   public static List<KV<Text, Employee>> getEmployeeData() {
     return (data.isEmpty() ? populateEmployeeData() : data)
         .stream()
-        .map(
-            input -> {
-              List<String> empData = Splitter.on('_').splitToList(input.getValue());
-              return KV.of(new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
-            })
-        .collect(Collectors.toList());
+            .map(
+                input -> {
+                  List<String> empData = Splitter.on('_').splitToList(input.getValue());
+                  return KV.of(
+                      new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
+                })
+            .collect(Collectors.toList());
   }
 }
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index d2ff9b9a3c43..c742185ca5ab 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -276,8 +276,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Except
         desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
       }
       ReaderContext readerContext = getReaderContext(desiredSplitCount);
-      //process the splits returned by native API
-      //this could be different from 'desiredSplitCount' calculated above
+      // process the splits returned by native API
+      // this could be different from 'desiredSplitCount' calculated above
       LOG.info(
           "Splitting into bundles of {} bytes: "
               + "estimated size {}, desired split count {}, actual split count {}",
@@ -486,7 +486,7 @@ private void flush() throws HCatException {
           masterWriter.commit(writerContext);
         } catch (HCatException e) {
           LOG.error("Exception in flush - write/commit data to Hive", e);
-          //abort on exception
+          // abort on exception
           masterWriter.abort(writerContext);
           throw e;
         } finally {
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
index 5121e39b3f07..fb83c0060f49 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
@@ -22,6 +22,7 @@
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+
 /** Helper for creating connection and test tables on hive database via JDBC driver. */
 class HiveDatabaseTestHelper {
   private static Connection con;


With regards,
Apache Git Services

Mime
View raw message