From commits-return-100515-archive-asf-public=cust-asf.ponee.io@beam.apache.org Wed Jan 16 03:09:52 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B9137180609 for ; Wed, 16 Jan 2019 03:09:48 +0100 (CET) Received: (qmail 17102 invoked by uid 500); 16 Jan 2019 02:09:47 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 17093 invoked by uid 99); 16 Jan 2019 02:09:47 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jan 2019 02:09:47 +0000 From: GitBox To: commits@beam.apache.org Subject: [beam] Diff for: [GitHub] reuvenlax merged pull request #7523: Apply spotless across Beam Message-ID: <154760458665.28863.4377135527162660783.gitbox@gitbox.apache.org> Date: Wed, 16 Jan 2019 02:09:46 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 4a516bb0547a..100869ae903c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -186,7 +186,8 @@ static void runWindowedWordCount(Options options) throws IOException { pipeline /* Read from the GCS file. */ .apply(TextIO.read().from(options.getInputFile())) - // Concept #2: Add an element timestamp, using an artificial time just to show windowing. + // Concept #2: Add an element timestamp, using an artificial time just to show + // windowing. // See AddTimestampFn for more detail on this. .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp))); diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index 6bcae8da28f3..14a11b71b5a8 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -224,7 +224,8 @@ private void tearDown() { Transport.getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. + // Do not log 404. It clutters the output and is possibly even required by the + // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); @@ -237,7 +238,8 @@ private void tearDown() { Transport.getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. + // Do not log 404. It clutters the output and is possibly even required by the + // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) .setRootUrl(options.getPubsubRootUrl()) .setApplicationName(options.getAppName()) diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index 9a3bfc26d5b6..bc6541341b8b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -122,7 +122,7 @@ * and then exits. */ public class TriggerExample { - //Numeric value of fixed window duration, in minutes + // Numeric value of fixed window duration, in minutes public static final int WINDOW_DURATION = 30; // Constants used in triggers. // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results. @@ -189,18 +189,22 @@ .apply( "Default", Window - // The default window duration values work well if you're running the default input + // The default window duration values work well if you're running the default + // input // file. You may want to adjust the window duration otherwise. .>into( FixedWindows.of(Duration.standardMinutes(windowDuration))) - // The default trigger first emits output when the system's watermark passes the end + // The default trigger first emits output when the system's watermark passes + // the end // of the window. .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) // Late data is dropped .withAllowedLateness(Duration.ZERO) // Discard elements after emitting each pane. - // With no allowed lateness and the specified trigger there will only be a single - // pane, so this doesn't have a noticeable effect. See concept 2 for more details. + // With no allowed lateness and the specified trigger there will only be a + // single + // pane, so this doesn't have a noticeable effect. See concept 2 for more + // details. .discardingFiredPanes()) .apply(new TotalFlow("default")); @@ -229,7 +233,8 @@ FixedWindows.of(Duration.standardMinutes(windowDuration))) // Late data is emitted as it arrives .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) - // Once the output is produced, the pane is dropped and we start preparing the next + // Once the output is produced, the pane is dropped and we start preparing the + // next // pane for the window .discardingFiredPanes() // Late data is handled up to one day @@ -264,8 +269,10 @@ AfterProcessingTime.pastFirstElementInPane() // Speculative every ONE_MINUTE .plusDelayOf(ONE_MINUTE))) - // After emitting each pane, it will continue accumulating the elements so that each - // approximation includes all of the previous data in addition to the newly arrived + // After emitting each pane, it will continue accumulating the elements so + // that each + // approximation includes all of the previous data in addition to the newly + // arrived // data. .accumulatingFiredPanes() .withAllowedLateness(ONE_DAY)) @@ -414,7 +421,7 @@ public void processElement(ProcessContext c) throws Exception { return; } if (laneInfo.length < VALID_NUM_FIELDS) { - //Skip the invalid input. + // Skip the invalid input. return; } String freeway = laneInfo[2]; diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java index 6dd112f5e2d7..962d5fae134e 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java @@ -129,14 +129,16 @@ public void testTeamScoresSpeculative() { .addElements( event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1))) - // Some time passes within the runner, which causes a speculative pane containing the blue + // Some time passes within the runner, which causes a speculative pane containing the + // blue // team's score to be emitted .advanceProcessingTime(Duration.standardMinutes(10)) .addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3))) // Some additional time passes and we get a speculative pane for the red team .advanceProcessingTime(Duration.standardMinutes(12)) .addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22))) - // More time passes and a speculative pane containing a refined value for the blue pane is + // More time passes and a speculative pane containing a refined value for the blue pane + // is // emitted .advanceProcessingTime(Duration.standardMinutes(10)) // Some more events occur @@ -238,7 +240,8 @@ public void testTeamScoresObservablyLate() { event(TestUser.RED_TWO, 2, Duration.ZERO), event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)), event(TestUser.RED_TWO, 3, Duration.standardMinutes(3))) - // A late refinement is emitted due to the advance in processing time, but the window has + // A late refinement is emitted due to the advance in processing time, but the window + // has // not yet closed because the watermark has not advanced .advanceProcessingTime(Duration.standardMinutes(12)) // These elements should appear in the final pane @@ -303,7 +306,8 @@ public void testTeamScoresDroppablyLate() { .plus(ALLOWED_LATENESS) .plus(TEAM_WINDOW_DURATION) .plus(Duration.standardMinutes(1))) - // These elements within the expired window are droppably late, and will not appear in the + // These elements within the expired window are droppably late, and will not appear in + // the // output .addElements( event( diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java index bd10307f7e3d..c53f48f0ff93 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java @@ -33,7 +33,7 @@ private TestApexRunner(ApexPipelineOptions options) { options.setEmbeddedExecution(true); - //options.setEmbeddedExecutionDebugMode(false); + // options.setEmbeddedExecutionDebugMode(false); this.delegate = ApexRunner.fromOptions(options); } diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index ca1c7ffa24de..eb625e4acc15 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -77,9 +77,7 @@ public void translate(ParDo.MultiOutput transform, TranslationC List> sideInputs = transform.getSideInputs(); Map, Coder> outputCoders = - outputs - .entrySet() - .stream() + outputs.entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect( Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); @@ -138,9 +136,7 @@ public void translate( List> sideInputs = transform.getSideInputs(); Map, Coder> outputCoders = - outputs - .entrySet() - .stream() + outputs.entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect( Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); @@ -221,8 +217,8 @@ static void addSideInputs( .getWindowingStrategy() .equals(firstSideInput.getWindowingStrategy())) { // TODO: check how to handle this in stream codec - //String msg = "Multiple side inputs with different window strategies."; - //throw new UnsupportedOperationException(msg); + // String msg = "Multiple side inputs with different window strategies."; + // throw new UnsupportedOperationException(msg); LOG.warn( "Side inputs union with different windowing strategies {} {}", firstSideInput.getWindowingStrategy(), diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 7a5fbaa8ec86..618b7a79e2ee 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -382,7 +382,7 @@ private void processWatermark(ApexStreamTuple.WatermarkTuple mark) { checkState( minEventTimeTimer >= currentInputWatermark, "Event time timer processing generates new timer(s) behind watermark."); - //LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer, + // LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer, // currentInputWatermark); // TODO: is this the right way to trigger processing time timers? diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java index 5746dcab4e5d..7df1e2bcf483 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java @@ -56,8 +56,8 @@ public void testGetYarnDeployDependencies() throws Exception { List deps = ApexYarnLauncher.getYarnDeployDependencies(); String depsToString = deps.toString(); // the beam dependencies are not present as jar when running within the Maven build reactor - //assertThat(depsToString, containsString("beam-runners-core-")); - //assertThat(depsToString, containsString("beam-runners-apex-")); + // assertThat(depsToString, containsString("beam-runners-core-")); + // assertThat(depsToString, containsString("beam-runners-apex-")); assertThat(depsToString, containsString("apex-common-")); assertThat(depsToString, not(containsString("hadoop-"))); assertThat(depsToString, not(containsString("zookeeper-"))); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java index 025c7d642cec..70c92762018b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java @@ -92,7 +92,8 @@ public FunctionSpec translate( /** Produces a {@link RunnerApi.CombinePayload} from a {@link Combine}. */ static CombinePayload payloadForCombine( final AppliedPTransform< - PCollection>, PCollection>, + PCollection>, + PCollection>, Combine.PerKey> combine, final SdkComponents components) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java index f812de1dbabc..daf9c48dff53 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -53,7 +53,8 @@ @Deprecated public static PCollectionView getView( AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>> application) throws IOException { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java index fb04428fd03e..bfa926d1405b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java @@ -112,10 +112,7 @@ public static String generateNameFromTransformNames( } else { // Enumerate the outer stages with their composite structure, if any. parts = - groupByOuter - .asMap() - .entrySet() - .stream() + groupByOuter.asMap().entrySet().stream() .map( outer -> String.format( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index fb4948b5f246..463e51dcbaea 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -301,10 +301,7 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform ap } public static Map, Coder> getOutputCoders(AppliedPTransform application) { - return application - .getOutputs() - .entrySet() - .stream() + return application.getOutputs().entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java index b4a917ca3dce..23a23d16b7dc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java @@ -100,7 +100,7 @@ public static PipelineOptions fromJson(String optionsJson) { Map probingOptionsMap = MAPPER.readValue(optionsJson, new TypeReference>() {}); if (probingOptionsMap.containsKey("options")) { - //Legacy options. + // Legacy options. return MAPPER.readValue(optionsJson, PipelineOptions.class); } else { // Fn Options with namespace and version. diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java index 94f8c3ba1902..a7a914c54a17 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java @@ -80,8 +80,7 @@ */ public static List prepareFilesForStaging( List resourcesToStage, String tmpJarLocation) { - return resourcesToStage - .stream() + return resourcesToStage.stream() .map(File::new) .filter(File::exists) .map( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index afd3d7012a38..8863c66cfe2e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -152,9 +152,7 @@ public void visitPrimitiveTransform(Node node) { // throws UnsupportedOperationException. transformBuilder.clearSubtransforms(); transformBuilder.addAllSubtransforms( - transform - .getSubtransformsList() - .stream() + transform.getSubtransformsList().stream() .filter(id -> !viewTransforms.contains(id)) .collect(Collectors.toList())); newTransforms.put(transformId, transformBuilder.build()); @@ -168,9 +166,7 @@ public void visitPrimitiveTransform(Node node) { viewOutputsToInputs.keySet().forEach(newPipeline.getComponentsBuilder()::removePcollections); newPipeline.clearRootTransformIds(); newPipeline.addAllRootTransformIds( - pipeline - .getRootTransformIdsList() - .stream() + pipeline.getRootTransformIdsList().stream() .filter(id -> !viewTransforms.contains(id)) .collect(Collectors.toList())); return newPipeline.build(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index 3197c694b62d..67b788764521 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -60,14 +60,16 @@ /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessNaive}. */ public static class OverrideFactory implements PTransformOverrideFactory< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, ProcessKeyedElements> { @Override public PTransformReplacement< PCollection>>, PCollectionTuple> getReplacementTransform( AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, ProcessKeyedElements> transform) { checkArgument( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index c40b4ce005bb..e42487ce692c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -132,8 +132,7 @@ public void validate() { return ImmutableList.of(this); } List> splits = boundedSource.split(desiredBundleSize, options); - return splits - .stream() + return splits.stream() .map(input -> new BoundedToUnboundedSourceAdapter<>(input)) .collect(Collectors.toList()); } catch (Exception e) { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index 4eb6bb0949e5..817f2d23c1bf 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -130,7 +130,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) { public static FileBasedSink getSink( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, ? extends PTransform, WriteFilesResult>> transform) throws IOException { @@ -140,7 +141,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) { public static List> getDynamicDestinationSideInputs( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, ? extends PTransform, WriteFilesResult>> transform) throws IOException { @@ -167,7 +169,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) { public static boolean isWindowedWrites( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, ? extends PTransform, WriteFilesResult>> transform) throws IOException { @@ -176,7 +179,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) { public static boolean isRunnerDeterminedSharding( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, ? extends PTransform, WriteFilesResult>> transform) throws IOException { @@ -185,7 +189,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) { private static WriteFilesPayload getWriteFilesPayload( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, ? extends PTransform, WriteFilesResult>> transform) throws IOException { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java index db8c63463568..8bdd71890574 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java @@ -183,8 +183,7 @@ default PTransform toPTransform(String uniqueName) { .toBuilder() .clearTransforms() .putAllTransforms( - getTransforms() - .stream() + getTransforms().stream() .collect( Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)))); @@ -214,33 +213,23 @@ static ExecutableStage fromPayload(ExecutableStagePayload payload) { PipelineNode.pCollection( payload.getInput(), components.getPcollectionsOrThrow(payload.getInput())); List sideInputs = - payload - .getSideInputsList() - .stream() + payload.getSideInputsList().stream() .map(sideInputId -> SideInputReference.fromSideInputId(sideInputId, components)) .collect(Collectors.toList()); List userStates = - payload - .getUserStatesList() - .stream() + payload.getUserStatesList().stream() .map(userStateId -> UserStateReference.fromUserStateId(userStateId, components)) .collect(Collectors.toList()); List timers = - payload - .getTimersList() - .stream() + payload.getTimersList().stream() .map(timerId -> TimerReference.fromTimerId(timerId, components)) .collect(Collectors.toList()); List transforms = - payload - .getTransformsList() - .stream() + payload.getTransformsList().stream() .map(id -> PipelineNode.pTransform(id, components.getTransformsOrThrow(id))) .collect(Collectors.toList()); List outputs = - payload - .getOutputsList() - .stream() + payload.getOutputsList().stream() .map(id -> PipelineNode.pCollection(id, components.getPcollectionsOrThrow(id))) .collect(Collectors.toList()); return ImmutableExecutableStage.of( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java index facac67d57a7..e0d68289dd49 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java @@ -65,8 +65,7 @@ static FusedPipeline of( Set executableTransformIds = Sets.union( executableStageTransforms.keySet(), - getRunnerExecutedTransforms() - .stream() + getRunnerExecutedTransforms().stream() .map(PTransformNode::getId) .collect(Collectors.toSet())); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java index b52c83c5d8c8..e1c50916b705 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java @@ -214,9 +214,11 @@ private static boolean parDoCompatibility( // upstream of any of the side inputs. || (pipeline.getSideInputs(parDo).isEmpty() // We purposefully break fusion here to provide runners the opportunity to insert a - // grouping operation to simplify implementing support for ParDo's that contain user state. + // grouping operation to simplify implementing support for ParDo's that contain user + // state. // We would not need to do this if we had the ability to mark upstream transforms as - // key preserving or if runners could execute ParDos containing user state in a distributed + // key preserving or if runners could execute ParDos containing user state in a + // distributed // fashion for a single key. && pipeline.getUserStates(parDo).isEmpty() // We purposefully break fusion here to provide runners the opportunity to insert a diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java index 640d402214d2..34c57e33af01 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java @@ -165,15 +165,13 @@ private FusedPipeline fusePipeline( // as can compatible producers/consumers if a PCollection is only materialized once. return FusedPipeline.of( deduplicated.getDeduplicatedComponents(), - stages - .stream() + stages.stream() .map(stage -> deduplicated.getDeduplicatedStages().getOrDefault(stage, stage)) .map(GreedyPipelineFuser::sanitizeDanglingPTransformInputs) .collect(Collectors.toSet()), Sets.union( deduplicated.getIntroducedTransforms(), - unfusedTransforms - .stream() + unfusedTransforms.stream() .map( transform -> deduplicated @@ -306,8 +304,7 @@ static DescendantConsumers of( pipeline.getEnvironment(newConsumer.consumingTransform()).get()); boolean foundSiblings = false; for (Set existingConsumers : compatibleConsumers.get(key)) { - if (existingConsumers - .stream() + if (existingConsumers.stream() .allMatch( // The two consume the same PCollection and can exist in the same stage. collectionConsumer -> @@ -340,8 +337,7 @@ private ExecutableStage fuseSiblings(Set mutuallyCompatible) return GreedyStageFuser.forGrpcPortRead( pipeline, rootCollection, - mutuallyCompatible - .stream() + mutuallyCompatible.stream() .map(CollectionConsumer::consumingTransform) .collect(Collectors.toSet())); } @@ -359,27 +355,19 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage Set possibleInputs = new HashSet<>(); possibleInputs.add(stage.getInputPCollection().getId()); possibleInputs.addAll( - stage - .getOutputPCollections() - .stream() + stage.getOutputPCollections().stream() .map(PCollectionNode::getId) .collect(Collectors.toSet())); possibleInputs.addAll( - stage - .getSideInputs() - .stream() + stage.getSideInputs().stream() .map(s -> s.collection().getId()) .collect(Collectors.toSet())); possibleInputs.addAll( - stage - .getTransforms() - .stream() + stage.getTransforms().stream() .flatMap(t -> t.getTransform().getOutputsMap().values().stream()) .collect(Collectors.toSet())); Set danglingInputs = - stage - .getTransforms() - .stream() + stage.getTransforms().stream() .flatMap(t -> t.getTransform().getInputsMap().values().stream()) .filter(in -> !possibleInputs.contains(in)) .collect(Collectors.toSet()); @@ -388,10 +376,7 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage for (PTransformNode transformNode : stage.getTransforms()) { PTransform transform = transformNode.getTransform(); Map validInputs = - transform - .getInputsMap() - .entrySet() - .stream() + transform.getInputsMap().entrySet().stream() .filter(e -> !danglingInputs.contains(e.getValue())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); @@ -411,15 +396,10 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage componentBuilder .clearTransforms() .putAllTransforms( - pTransformNodes - .stream() + pTransformNodes.stream() .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))); Map validPCollectionMap = - stage - .getComponents() - .getPcollectionsMap() - .entrySet() - .stream() + stage.getComponents().getPcollectionsMap().entrySet().stream() .filter(e -> !danglingInputs.contains(e.getKey())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java index 1f1c3832725c..9546d973c1ba 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java @@ -43,8 +43,7 @@ public static ImmutableExecutableStage ofFullComponents( .toBuilder() .clearTransforms() .putAllTransforms( - transforms - .stream() + transforms.stream() .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))) .build(); return of( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java index b2d277e4a462..c179372c05bb 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java @@ -208,11 +208,14 @@ public final NodeT apply(NodeT input) { // // The only edges that are ignored by the algorithm are back edges. // The algorithm (while there are still nodes in the graph): - // 1) Removes all sinks from the graph adding them to the beginning of "s2". Continue to do this till there + // 1) Removes all sinks from the graph adding them to the beginning of "s2". Continue to do + // this till there // are no more sinks. - // 2) Removes all source from the graph adding them to the end of "s1". Continue to do this till there + // 2) Removes all source from the graph adding them to the end of "s1". Continue to do this + // till there // are no more sources. - // 3) Remote a single node with the highest delta within the graph and add it to the end of "s1". + // 3) Remote a single node with the highest delta within the graph and add it to the end of + // "s1". // // The topological order is then the s1 concatenated with s2. diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java index 5a481901c8f3..f64d94d6f5a0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java @@ -293,13 +293,10 @@ private static ExecutableStage deduplicateStageOutput( .toBuilder() .clearTransforms() .putAllTransforms( - updatedTransforms - .stream() + updatedTransforms.stream() .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))) .putAllPcollections( - originalToPartial - .values() - .stream() + originalToPartial.values().stream() .collect( Collectors.toMap(PCollectionNode::getId, PCollectionNode::getPCollection))) .build(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java index 710fbe2e4570..c4b12d75ed0a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java @@ -132,11 +132,15 @@ private QueryablePipeline(Collection transformIds, Components components PTransform transform = transformEntry.getValue(); boolean isPrimitive = isPrimitiveTransform(transform); if (isPrimitive) { - // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested descendents), due to runners - // either rewriting them in terms of runner-specific transforms, or SDKs constructing them in terms of other + // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested + // descendents), due to runners + // either rewriting them in terms of runner-specific transforms, or SDKs constructing them + // in terms of other // underlying transforms (see https://issues.apache.org/jira/browse/BEAM-5441). - // We consider any "leaf" descendents of these "primitive" transforms to be the true "primitives" that we - // preserve here; in the common case, this is just the "primitive" itself, which has no descendents). + // We consider any "leaf" descendents of these "primitive" transforms to be the true + // "primitives" that we + // preserve here; in the common case, this is just the "primitive" itself, which has no + // descendents). Deque transforms = new ArrayDeque<>(); transforms.push(transformEntry.getKey()); while (!transforms.isEmpty()) { @@ -229,9 +233,7 @@ private static boolean isPrimitiveTransform(PTransform transform) { } public Collection getTransforms() { - return pipelineNetwork - .nodes() - .stream() + return pipelineNetwork.nodes().stream() .filter(PTransformNode.class::isInstance) .map(PTransformNode.class::cast) .collect(Collectors.toList()); @@ -252,9 +254,7 @@ private static boolean isPrimitiveTransform(PTransform transform) { * have no input {@link PCollection}. */ public Set getRootTransforms() { - return pipelineNetwork - .nodes() - .stream() + return pipelineNetwork.nodes().stream() .filter(pipelineNode -> pipelineNetwork.inEdges(pipelineNode).isEmpty()) .map(pipelineNode -> (PTransformNode) pipelineNode) .collect(Collectors.toSet()); @@ -276,14 +276,10 @@ public PTransformNode getProducer(PCollectionNode pcollection) { * does consume the {@link PCollectionNode} on a per-element basis. */ public Set getPerElementConsumers(PCollectionNode pCollection) { - return pipelineNetwork - .successors(pCollection) - .stream() + return pipelineNetwork.successors(pCollection).stream() .filter( consumer -> - pipelineNetwork - .edgesConnecting(pCollection, consumer) - .stream() + pipelineNetwork.edgesConnecting(pCollection, consumer).stream() .anyMatch(PipelineEdge::isPerElement)) .map(pipelineNode -> (PTransformNode) pipelineNode) .collect(Collectors.toSet()); @@ -294,14 +290,10 @@ public PTransformNode getProducer(PCollectionNode pcollection) { * the collection as a singleton. */ public Set getSingletonConsumers(PCollectionNode pCollection) { - return pipelineNetwork - .successors(pCollection) - .stream() + return pipelineNetwork.successors(pCollection).stream() .filter( consumer -> - pipelineNetwork - .edgesConnecting(pCollection, consumer) - .stream() + pipelineNetwork.edgesConnecting(pCollection, consumer).stream() .anyMatch(edge -> !edge.isPerElement())) .map(pipelineNode -> (PTransformNode) pipelineNode) .collect(Collectors.toSet()); @@ -312,18 +304,14 @@ public PTransformNode getProducer(PCollectionNode pcollection) { * per-element basis. */ public Set getPerElementInputPCollections(PTransformNode ptransform) { - return pipelineNetwork - .inEdges(ptransform) - .stream() + return pipelineNetwork.inEdges(ptransform).stream() .filter(PipelineEdge::isPerElement) .map(edge -> (PCollectionNode) pipelineNetwork.incidentNodes(edge).source()) .collect(Collectors.toSet()); } public Set getOutputPCollections(PTransformNode ptransform) { - return pipelineNetwork - .successors(ptransform) - .stream() + return pipelineNetwork.successors(ptransform).stream() .map(pipelineNode -> (PCollectionNode) pipelineNode) .collect(Collectors.toSet()); } @@ -337,8 +325,7 @@ public Components getComponents() { * as side inputs. */ public Collection getSideInputs(PTransformNode transform) { - return getLocalSideInputNames(transform.getTransform()) - .stream() + return getLocalSideInputNames(transform.getTransform()).stream() .map( localName -> { String transformId = transform.getId(); @@ -354,8 +341,7 @@ public Components getComponents() { } public Collection getUserStates(PTransformNode transform) { - return getLocalUserStateNames(transform.getTransform()) - .stream() + return getLocalUserStateNames(transform.getTransform()).stream() .map( localName -> { String transformId = transform.getId(); @@ -382,8 +368,7 @@ public Components getComponents() { } public Collection getTimers(PTransformNode transform) { - return getLocalTimerNames(transform.getTransform()) - .stream() + return getLocalTimerNames(transform.getTransform()).stream() .map( localName -> { String transformId = transform.getId(); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java index 60525e991f87..ed1286ce6d35 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java @@ -69,9 +69,7 @@ public void processElement( })); ExecutableStage firstEnvStage = - GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)) - .getFusedStages() - .stream() + GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream() .findFirst() .get(); RunnerApi.ExecutableStagePayload basePayload = diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java index 25e7253a671d..13a954913a81 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java @@ -51,13 +51,15 @@ PCollection, PCollection, MapElements> factory = new SingleInputOutputOverrideFactory< - PCollection, PCollection, + PCollection, + PCollection, MapElements>() { @Override public PTransformReplacement, PCollection> getReplacementTransform( AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, MapElements> transform) { return PTransformReplacement.of( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java index 1535d03e9e0f..3c477184717a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java @@ -105,8 +105,7 @@ protected boolean matchesSafely(ExecutableStage item) { return item.getInputPCollection().getId().equals(inputPCollectionId) && containsInAnyOrder(sideInputIds.toArray()) .matches( - item.getSideInputs() - .stream() + item.getSideInputs().stream() .map( ref -> SideInputId.newBuilder() @@ -115,14 +114,12 @@ protected boolean matchesSafely(ExecutableStage item) { .build()) .collect(Collectors.toSet())) && materializedPCollection.matches( - item.getOutputPCollections() - .stream() + item.getOutputPCollections().stream() .map(PCollectionNode::getId) .collect(Collectors.toSet())) && containsInAnyOrder(fusedTransforms.toArray(new String[0])) .matches( - item.getTransforms() - .stream() + item.getTransforms().stream() .map(PTransformNode::getId) .collect(Collectors.toSet())); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java index 5e681f2bb7cd..e329c00c2cf0 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java @@ -536,9 +536,7 @@ public void flattenWithHeterogenousInputsAndOutputsEntirelyMaterialized() { .withNoOutputs() .withTransforms("pyParDo"))); Set materializedStageOutputs = - fused - .getFusedStages() - .stream() + fused.getFusedStages().stream() .flatMap(executableStage -> executableStage.getOutputPCollections().stream()) .map(PCollectionNode::getId) .collect(Collectors.toSet()); @@ -1316,16 +1314,11 @@ public void sanitizedTransforms() throws Exception { ExecutableStageMatcher.withInput(impulse2Output.getUniqueName()) .withTransforms(flattenTransform.getUniqueName(), read2Transform.getUniqueName()))); assertThat( - fused - .getFusedStages() - .stream() + fused.getFusedStages().stream() .flatMap( s -> - s.getComponents() - .getTransformsOrThrow(flattenTransform.getUniqueName()) - .getInputsMap() - .values() - .stream()) + s.getComponents().getTransformsOrThrow(flattenTransform.getUniqueName()) + .getInputsMap().values().stream()) .collect(Collectors.toList()), containsInAnyOrder(read1Output.getUniqueName(), read2Output.getUniqueName())); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java index 1e20c73feb37..94ad3e8abd68 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java @@ -1165,9 +1165,7 @@ public void materializesWithGroupByKeyConsumer() { protected boolean matchesSafely(ExecutableStage executableStage) { // NOTE: Transform names must be unique, so it's fine to throw here if this does not hold. Set stageTransforms = - executableStage - .getTransforms() - .stream() + executableStage.getTransforms().stream() .map(PTransformNode::getId) .collect(Collectors.toSet()); return stageTransforms.containsAll(expectedTransforms) diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java index 065ad4e71361..a2a9991e2d47 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java @@ -224,18 +224,14 @@ public void testNodeReplacement() { MutableNetwork originalNetwork = createNetwork(); for (String node : originalNetwork.nodes()) { assertEquals( - originalNetwork - .successors(node) - .stream() + originalNetwork.successors(node).stream() .map(function) .collect(Collectors.toCollection(HashSet::new)), network.successors(function.apply(node))); } assertEquals( network.nodes(), - originalNetwork - .nodes() - .stream() + originalNetwork.nodes().stream() .map(function) .collect(Collectors.toCollection(HashSet::new))); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java index 624eff331402..023cc27cc7a6 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java @@ -271,10 +271,7 @@ public void duplicateOverStages() { assertThat(result.getDeduplicatedStages().keySet(), hasSize(2)); List stageOutputs = - result - .getDeduplicatedStages() - .values() - .stream() + result.getDeduplicatedStages().values().stream() .flatMap(stage -> stage.getOutputPCollections().stream().map(PCollectionNode::getId)) .collect(Collectors.toList()); assertThat( @@ -398,11 +395,7 @@ public void duplicateOverStagesAndTransforms() { introducedOutputs.addAll( result.getDeduplicatedTransforms().get("shared").getTransform().getOutputsMap().values()); introducedOutputs.addAll( - result - .getDeduplicatedStages() - .get(oneStage) - .getOutputPCollections() - .stream() + result.getDeduplicatedStages().get(oneStage).getOutputPCollections().stream() .map(PCollectionNode::getId) .collect(Collectors.toList())); assertThat( @@ -588,16 +581,11 @@ public void multipleDuplicatesInStages() { assertThat(result.getDeduplicatedTransforms().keySet(), empty()); Collection introducedIds = - result - .getIntroducedTransforms() - .stream() + result.getIntroducedTransforms().stream() .flatMap(pt -> pt.getTransform().getInputsMap().values().stream()) .collect(Collectors.toList()); String[] stageOutputs = - result - .getDeduplicatedStages() - .values() - .stream() + result.getDeduplicatedStages().values().stream() .flatMap(s -> s.getOutputPCollections().stream().map(PCollectionNode::getId)) .toArray(String[]::new); assertThat(introducedIds, containsInAnyOrder(stageOutputs)); @@ -608,9 +596,7 @@ public void multipleDuplicatesInStages() { assertThat( result.getDeduplicatedComponents().getTransformsMap().entrySet(), hasItems( - result - .getIntroducedTransforms() - .stream() + result.getIntroducedTransforms().stream() .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)) .entrySet() .toArray(new Map.Entry[0]))); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java index 055b66594386..c2399545ce57 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java @@ -210,10 +210,7 @@ public void transformWithSideAndMainInputs() { PTransform parDoTransform = components.getTransformsOrThrow("par_do"); String sideInputLocalName = getOnlyElement( - parDoTransform - .getInputsMap() - .entrySet() - .stream() + parDoTransform.getInputsMap().entrySet().stream() .filter(entry -> !entry.getValue().equals(mainInputName)) .map(Map.Entry::getKey) .collect(Collectors.toSet())); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 790274833666..eafc9b82bfc0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -116,9 +116,7 @@ public LateDataFilter( StreamSupport.stream(elements.spliterator(), false) .map( input -> - input - .getWindows() - .stream() + input.getWindows().stream() .map( window -> WindowedValue.of( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 45c847edae57..93b9a7fddab3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -104,14 +104,16 @@ public String getUrn() { /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */ public static class OverrideFactory implements PTransformOverrideFactory< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, ProcessKeyedElements> { @Override public PTransformReplacement< PCollection>>, PCollectionTuple> getReplacementTransform( AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, ProcessKeyedElements> transform) { return PTransformReplacement.of( diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index d5d10de83707..f03f71f1c619 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -540,8 +540,7 @@ public final void injectElements(List> values) throws E } Iterable> inputs = - values - .stream() + values.stream() .map( input -> { try { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index 0a9ec4935190..2bab79b1cb3d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -28,14 +28,16 @@ /** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */ class DirectGBKIntoKeyedWorkItemsOverrideFactory extends SingleInputOutputOverrideFactory< - PCollection>, PCollection>, + PCollection>, + PCollection>, GBKIntoKeyedWorkItems> { @Override public PTransformReplacement< PCollection>, PCollection>> getReplacementTransform( AppliedPTransform< - PCollection>, PCollection>, + PCollection>, + PCollection>, GBKIntoKeyedWorkItems> transform) { return PTransformReplacement.of( diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index 49528160fa81..873eb1f4c884 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -30,13 +30,15 @@ /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */ final class DirectGroupByKeyOverrideFactory extends SingleInputOutputOverrideFactory< - PCollection>, PCollection>>, + PCollection>, + PCollection>>, PTransform>, PCollection>>>> { @Override public PTransformReplacement>, PCollection>>> getReplacementTransform( AppliedPTransform< - PCollection>, PCollection>>, + PCollection>, + PCollection>>, PTransform>, PCollection>>>> transform) { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index a5a1c7658e1e..5b026cba3943 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -324,8 +324,7 @@ private void shutdownIfNecessary(State newState) { "Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n" - + errors - .stream() + + errors.stream() .map(Exception::getMessage) .collect(Collectors.joining("\n- ", "- ", ""))); visibleUpdates.failed(exception); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 0a2d42f67c05..77aa9fc1826a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -81,7 +81,8 @@ public void cleanup() {} private TransformEvaluator> createEvaluator( AppliedPTransform< - PCollection>, PCollection>>, + PCollection>, + PCollection>>, DirectGroupAlsoByWindow> application, CommittedBundle> inputBundle) { @@ -100,7 +101,8 @@ public void cleanup() {} private final EvaluationContext evaluationContext; private final PipelineOptions options; private final AppliedPTransform< - PCollection>, PCollection>>, + PCollection>, + PCollection>>, DirectGroupAlsoByWindow> application; @@ -120,7 +122,8 @@ public GroupAlsoByWindowEvaluator( PipelineOptions options, CommittedBundle> inputBundle, final AppliedPTransform< - PCollection>, PCollection>>, + PCollection>, + PCollection>>, DirectGroupAlsoByWindow> application) { this.evaluationContext = evaluationContext; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java index d22ddb7fdebd..4f0594a4291a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java @@ -118,7 +118,8 @@ public boolean matches(AppliedPTransform application) { static class Factory extends SingleInputOutputOverrideFactory< - PCollection>, PCollection>, + PCollection>, + PCollection>, PTransform>, PCollection>>> { public static PTransformOverrideFactory create() { return new Factory<>(); @@ -130,7 +131,8 @@ private Factory() {} public PTransformReplacement>, PCollection>> getReplacementTransform( AppliedPTransform< - PCollection>, PCollection>, + PCollection>, + PCollection>, PTransform>, PCollection>>> transform) { GlobalCombineFn globalFn = ((Combine.PerKey) transform.getTransform()).getFn(); @@ -366,7 +368,8 @@ public MergeAndExtractAccumulatorOutputEvaluatorFactory(EvaluationContext ctxt) private TransformEvaluator>> createEvaluator( AppliedPTransform< - PCollection>>, PCollection>, + PCollection>>, + PCollection>, MergeAndExtractAccumulatorOutput> application, CommittedBundle>> inputBundle) { @@ -380,7 +383,8 @@ public void cleanup() throws Exception {} private static class MergeAccumulatorsAndExtractOutputEvaluator implements TransformEvaluator>> { private final AppliedPTransform< - PCollection>>, PCollection>, + PCollection>>, + PCollection>, MergeAndExtractAccumulatorOutput> application; private final CombineFn combineFn; @@ -389,7 +393,8 @@ public void cleanup() throws Exception {} public MergeAccumulatorsAndExtractOutputEvaluator( EvaluationContext ctxt, AppliedPTransform< - PCollection>>, PCollection>, + PCollection>>, + PCollection>, MergeAndExtractAccumulatorOutput> application) { this.application = application; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 01bb26dfaa4c..c8ad2383659e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -112,9 +112,7 @@ evaluationContext.createSideInputReader(sideInputs); Map, Coder> outputCoders = - outputs - .entrySet() - .stream() + outputs.entrySet().stream() .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getCoder())); PushbackSideInputDoFnRunner runner = diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 112487851e8b..a11b384d7215 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -65,13 +65,15 @@ @VisibleForTesting public class ParDoMultiOverrideFactory implements PTransformOverrideFactory< - PCollection, PCollectionTuple, + PCollection, + PCollectionTuple, PTransform, PCollectionTuple>> { @Override public PTransformReplacement, PCollectionTuple> getReplacementTransform( AppliedPTransform< - PCollection, PCollectionTuple, + PCollection, + PCollectionTuple, PTransform, PCollectionTuple>> application) { @@ -87,7 +89,8 @@ @SuppressWarnings("unchecked") private PTransform, PCollectionTuple> getReplacementForApplication( AppliedPTransform< - PCollection, PCollectionTuple, + PCollection, + PCollectionTuple, PTransform, PCollectionTuple>> application) throws IOException { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 737098f44738..86820ef2457b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -103,7 +103,8 @@ public void cleanup() throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) private TransformEvaluator>> createEvaluator( AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, ProcessElements> application, CommittedBundle inputBundle) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index a12d71b865f6..bd05a5bcaca9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -106,7 +106,8 @@ public void cleanup() throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) private TransformEvaluator>> createEvaluator( AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, StatefulParDo> application, CommittedBundle>> inputBundle) @@ -203,7 +204,8 @@ public Runnable load( @AutoValue abstract static class AppliedPTransformOutputKeyAndWindow { abstract AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, StatefulParDo> getTransform(); @@ -213,7 +215,8 @@ public Runnable load( static AppliedPTransformOutputKeyAndWindow create( AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, StatefulParDo> transform, StructuralKey key, diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index 6f5eff614a7d..397b0675f640 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -42,13 +42,15 @@ */ class ViewOverrideFactory implements PTransformOverrideFactory< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>> { @Override public PTransformReplacement, PCollection> getReplacementTransform( AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>> transform) { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 4d5a2d9f2123..62cc673f1953 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -51,7 +51,8 @@ */ class WriteWithShardingFactory implements PTransformOverrideFactory< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, PTransform, WriteFilesResult>> { static final int MAX_RANDOM_EXTRA_SHARDS = 3; @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3; @@ -60,7 +61,8 @@ public PTransformReplacement, WriteFilesResult> getReplacementTransform( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, PTransform, WriteFilesResult>> transform) { try { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java index 55282534e40c..e4442108f123 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java @@ -317,8 +317,7 @@ private void shutdownIfNecessary(State newState) { "Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n" - + errors - .stream() + + errors.stream() .map(Exception::getMessage) .collect(Collectors.joining("\n- ", "- ", ""))); visibleUpdates.failed(exception); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java index 8c6873a422e7..2c2c9f07425c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java @@ -479,8 +479,7 @@ private static Pipeline foldFeedSDFIntoExecutableStage(Pipeline p) { QueryablePipeline q = QueryablePipeline.forPipeline(p); String feedSdfUrn = SplittableRemoteStageEvaluatorFactory.FEED_SDF_URN; List feedSDFNodes = - q.getTransforms() - .stream() + q.getTransforms().stream() .filter(node -> node.getTransform().getSpec().getUrn().equals(feedSdfUrn)) .collect(Collectors.toList()); Map stageToFeeder = Maps.newHashMap(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java index 77b1028844c2..8e1514f2fd62 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java @@ -124,10 +124,9 @@ public void stop() { private static class ServerConfiguration { @Option( - name = "-p", - aliases = {"--port"}, - usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)" - ) + name = "-p", + aliases = {"--port"}, + usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)") private int port = 8099; } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index eac8554804b7..d3ede31814e9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -326,7 +326,8 @@ public void onElement(final ProcessContext ctx) { @Teardown public void teardown() { - // just to not have a fast execution hiding an issue until we have a shutdown callback + // just to not have a fast execution hiding an issue until we have a shutdown + // callback try { Thread.sleep(1000); } catch (final InterruptedException e) { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index a9673bf716ca..54018cfdbf46 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -148,7 +148,8 @@ public void process(ProcessContext c) {} new StatefulParDoEvaluatorFactory<>(mockEvaluationContext, options); AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, StatefulParDo> producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced); @@ -256,7 +257,8 @@ public void process(ProcessContext c) {} // This will be the stateful ParDo from the expansion AppliedPTransform< - PCollection>>, PCollectionTuple, + PCollection>>, + PCollectionTuple, StatefulParDo> producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 5ec2fd9c28f1..684ac0223ba9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -151,7 +151,8 @@ public void withNoShardingSpecifiedReturnsNewTransform() { PCollection objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, PTransform, WriteFilesResult>> originalApplication = AppliedPTransform.of("write", objs.expand(), Collections.emptyMap(), original, p); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java index f16aaea6f0ac..fd55e85d4c43 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java @@ -157,9 +157,7 @@ public void proc(ProcessContext ctxt) { PTransformNode impulseTransform = getOnlyElement(fusedQP.getRootTransforms()); PCollectionNode impulseOutput = getOnlyElement(fusedQP.getOutputPCollections(impulseTransform)); PTransformNode stage = - fusedPipeline - .getRootTransformIdsList() - .stream() + fusedPipeline.getRootTransformIdsList().stream() .map( id -> PipelineNode.pTransform( @@ -212,9 +210,7 @@ public void process(ProcessContext ctxt) { checkState(leftRoot != null); checkState(rightRoot != null); PTransformNode stage = - fusedPipeline - .getRootTransformIdsList() - .stream() + fusedPipeline.getRootTransformIdsList().stream() .map( id -> PipelineNode.pTransform( diff --git a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java index 2d1d94b16d76..c3af72c3ec80 100644 --- a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java +++ b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java @@ -128,9 +128,8 @@ private CounterMetricMessage( } @SuppressFBWarnings( - value = "VA_FORMAT_STRING_USES_NEWLINE", - justification = "\\n is part of graphite protocol" - ) + value = "VA_FORMAT_STRING_USES_NEWLINE", + justification = "\\n is part of graphite protocol") @Override protected String createCommittedMessage() { String metricMessage = @@ -145,9 +144,8 @@ protected String createCommittedMessage() { } @SuppressFBWarnings( - value = "VA_FORMAT_STRING_USES_NEWLINE", - justification = "\\n is part of graphite protocol" - ) + value = "VA_FORMAT_STRING_USES_NEWLINE", + justification = "\\n is part of graphite protocol") @Override protected String createAttemptedMessage() { String metricMessage = @@ -172,9 +170,8 @@ private GaugeMetricMessage(MetricResult gauge, String valueType) { } @SuppressFBWarnings( - value = "VA_FORMAT_STRING_USES_NEWLINE", - justification = "\\n is part of graphite protocol" - ) + value = "VA_FORMAT_STRING_USES_NEWLINE", + justification = "\\n is part of graphite protocol") @Override protected String createCommittedMessage() { String metricMessage = @@ -188,9 +185,8 @@ protected String createCommittedMessage() { } @SuppressFBWarnings( - value = "VA_FORMAT_STRING_USES_NEWLINE", - justification = "\\n is part of graphite protocol" - ) + value = "VA_FORMAT_STRING_USES_NEWLINE", + justification = "\\n is part of graphite protocol") @Override protected String createAttemptedMessage() { String metricMessage = @@ -218,9 +214,8 @@ public DistributionMetricMessage( } @SuppressFBWarnings( - value = "VA_FORMAT_STRING_USES_NEWLINE", - justification = "\\n is part of graphite protocol" - ) + value = "VA_FORMAT_STRING_USES_NEWLINE", + justification = "\\n is part of graphite protocol") @Override protected String createCommittedMessage() { Number value = null; @@ -255,9 +250,8 @@ protected String createCommittedMessage() { } @SuppressFBWarnings( - value = "VA_FORMAT_STRING_USES_NEWLINE", - justification = "\\n is part of graphite protocol" - ) + value = "VA_FORMAT_STRING_USES_NEWLINE", + justification = "\\n is part of graphite protocol") @Override protected String createAttemptedMessage() { Number value = null; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java index 96f4b5b15f94..df067077d8bd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java @@ -135,14 +135,16 @@ private CreateFlinkPCollectionView(PCollectionView view) { public static class Factory implements PTransformOverrideFactory< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>> { public Factory() {} @Override public PTransformReplacement, PCollection> getReplacementTransform( AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>> transform) { PCollection collection = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index d2557e8b239e..8f9fcb9ade4d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -296,7 +296,7 @@ public boolean test(RunnerApi.PTransform pTransform) { } catch (InvalidProtocolBufferException e) { throw new IllegalArgumentException(e); } - //TODO: https://issues.apache.org/jira/browse/BEAM-4296 + // TODO: https://issues.apache.org/jira/browse/BEAM-4296 // This only works for well known window fns, we should defer this execution to the SDK // if the WindowFn can't be parsed or just defer it all the time. WindowFn windowFn = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index a688bee5f3cc..271ef0bcc4d3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -707,11 +707,13 @@ public void translateNode( @SuppressWarnings("unchecked") AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>> application = (AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, PTransform, PCollection>>) context.getCurrentTransform(); PCollectionView input; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java index 7222a3037a38..466069f8ea31 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -110,10 +110,7 @@ public void setCurrentTransform(AppliedPTransform currentTransform) { } public Map, Coder> getOutputCoders() { - return currentTransform - .getOutputs() - .entrySet() - .stream() + return currentTransform.getOutputs().entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 37dfb0c0ecb0..3c69817fa5ee 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -99,7 +99,8 @@ static ExecutionEnvironment createBatchExecutionEnvironment( if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) { flinkBatchEnv.setParallelism(options.getParallelism()); } - // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent splits. + // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent + // splits. final int parallelism; if (flinkBatchEnv instanceof CollectionEnvironment) { parallelism = 1; @@ -267,7 +268,8 @@ private static int determineParallelism( return pipelineOptionsParallelism; } if (envParallelism > 0) { - // If the user supplies a parallelism on the command-line, this is set on the execution environment during creation + // If the user supplies a parallelism on the command-line, this is set on the execution + // environment during creation return envParallelism; } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java index c00f14c8f234..efaa40aa4ea2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java @@ -282,8 +282,7 @@ private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) { Collection pCollecctions = pipeline.getComponents().getPcollectionsMap().values(); // Assume that all PCollections are consumed at some point in the pipeline. - return pCollecctions - .stream() + return pCollecctions.stream() .anyMatch(pc -> pc.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 40e815640581..e40b91f80539 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -58,15 +58,13 @@ String host = "localhost"; @Option( - name = "--job-port", - usage = "The job service port. 0 to use a dynamic port. (Default: 8099)" - ) + name = "--job-port", + usage = "The job service port. 0 to use a dynamic port. (Default: 8099)") int port = 8099; @Option( - name = "--artifact-port", - usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)" - ) + name = "--artifact-port", + usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)") int artifactPort = 8098; @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files") @@ -74,9 +72,8 @@ Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString(); @Option( - name = "--clean-artifacts-per-job", - usage = "When true, remove each job's staged artifacts when it completes" - ) + name = "--clean-artifacts-per-job", + usage = "When true, remove each job's staged artifacts when it completes") boolean cleanArtifactsPerJob = false; @Option(name = "--flink-master-url", usage = "Flink master url to submit job.") @@ -87,9 +84,8 @@ String getFlinkMasterUrl() { } @Option( - name = "--sdk-worker-parallelism", - usage = "Default parallelism for SDK worker processes (see portable pipeline options)" - ) + name = "--sdk-worker-parallelism", + usage = "Default parallelism for SDK worker processes (see portable pipeline options)") Long sdkWorkerParallelism = 1L; Long getSdkWorkerParallelism() { @@ -97,12 +93,11 @@ Long getSdkWorkerParallelism() { } @Option( - name = "--flink-conf-dir", - usage = - "Directory containing Flink YAML configuration files. " - + "These properties will be set to all jobs submitted to Flink and take precedence " - + "over configurations in FLINK_CONF_DIR." - ) + name = "--flink-conf-dir", + usage = + "Directory containing Flink YAML configuration files. " + + "These properties will be set to all jobs submitted to Flink and take precedence " + + "over configurations in FLINK_CONF_DIR.") String flinkConfDir = null; @Nullable @@ -112,7 +107,7 @@ String getFlinkConfDir() { } public static void main(String[] args) throws Exception { - //TODO: Expose the fileSystem related options. + // TODO: Expose the fileSystem related options. // Register standard file systems. FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); fromParams(args).run(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 6126af5303f5..83776e170864 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -172,7 +172,8 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) { @VisibleForTesting static class StreamingShardedWriteFactory implements PTransformOverrideFactory< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, WriteFiles> { FlinkPipelineOptions options; @@ -184,7 +185,8 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) { public PTransformReplacement, WriteFilesResult> getReplacementTransform( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, WriteFiles> transform) { // By default, if numShards is not set WriteFiles will produce one file per bundle. In diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index 80ed3601fe26..8cd195b53103 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -489,7 +489,7 @@ private void translateStreamingImpulse( } catch (InvalidProtocolBufferException e) { throw new IllegalArgumentException(e); } - //TODO: https://issues.apache.org/jira/browse/BEAM-4296 + // TODO: https://issues.apache.org/jira/browse/BEAM-4296 // This only works for well known window fns, we should defer this execution to the SDK // if the WindowFn can't be parsed or just defer it all the time. WindowFn windowFn = @@ -695,7 +695,8 @@ private void translateStreamingImpulse( for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : stagePayload.getSideInputsList()) { - // TODO: local name is unique as long as only one transform with side input can be within a stage + // TODO: local name is unique as long as only one transform with side input can be within a + // stage String sideInputTag = sideInputId.getLocalName(); String collectionId = components diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index c770cdf790eb..e23e67fa03ac 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -992,7 +992,8 @@ public void translateNode( // allowed to have only one input keyed, normally. TwoInputTransformation< - WindowedValue>, RawUnionValue, + WindowedValue>, + RawUnionValue, WindowedValue>> rawFlinkTransform = new TwoInputTransformation<>( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 00d1ea867feb..027b07c36156 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -103,10 +103,7 @@ public void setCurrentTransform(AppliedPTransform currentTransform) { } public Map, Coder> getOutputCoders() { - return currentTransform - .getOutputs() - .entrySet() - .stream() + return currentTransform.getOutputs().entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java index 32ec4ca6229b..7eb8b23ef139 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java @@ -59,10 +59,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { } private boolean hasUnboundedOutput(AppliedPTransform transform) { - return transform - .getOutputs() - .values() - .stream() + return transform.getOutputs().values().stream() .filter(value -> value instanceof PCollection) .map(value -> (PCollection) value) .anyMatch(collection -> collection.isBounded() == IsBounded.UNBOUNDED); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java index 995936ecb191..ccad6742e7bb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java @@ -85,7 +85,8 @@ private JobFactoryState(int maxFactories) { Preconditions.checkArgument(maxFactories >= 0, "sdk_worker_parallelism must be >= 0"); if (maxFactories == 0) { - // if this is 0, use the auto behavior of num_cores - 1 so that we leave some resources available for the java process + // if this is 0, use the auto behavior of num_cores - 1 so that we leave some resources + // available for the java process this.maxFactories = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1); } else { this.maxFactories = maxFactories; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index bc51a106d7bd..e7dafa8968f8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -332,7 +332,7 @@ public void close() throws Exception { // close may be called multiple times when an exception is thrown if (stageContext != null) { try (AutoCloseable bundleFactoryCloser = stageBundleFactory; - AutoCloseable closable = stageContext) { + AutoCloseable closable = stageContext) { } catch (Exception e) { LOG.error("Error in close: ", e); throw e; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java index c211403c2d4e..e404298d9ae9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java @@ -86,9 +86,7 @@ public SideInputInitializer(PCollectionView view) { InMemoryMultimapSideInputView.fromIterable( keyCoder, (Iterable) - elements - .getValue() - .stream() + elements.getValue().stream() .map(WindowedValue::getValue) .collect(Collectors.toList())))); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index d1ef494fc2e1..eef86d7728e8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -354,7 +354,8 @@ private void setTimer(WindowedValue timerElement, TimerInternals.TimerDa try { Object key = keySelector.getKey(timerElement); sdkHarnessRunner.setCurrentTimerKey(key); - // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests + // We have to synchronize to ensure the state backend is not concurrently accessed by the + // state requests try { stateBackendLock.lock(); getKeyedStateBackend().setCurrentKey(key); @@ -385,7 +386,8 @@ public void fireTimer(InternalTimer timer) { } // Prepare the SdkHarnessRunner with the key for the timer sdkHarnessRunner.setCurrentTimerKey(decodedKey); - // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests + // We have to synchronize to ensure the state backend is not concurrently accessed by the state + // requests try { stateBackendLock.lock(); getKeyedStateBackend().setCurrentKey(encodedKey); @@ -399,9 +401,10 @@ public void fireTimer(InternalTimer timer) { public void dispose() throws Exception { // may be called multiple times when an exception is thrown if (stageContext != null) { - // Remove the reference to stageContext and make stageContext available for garbage collection. + // Remove the reference to stageContext and make stageContext available for garbage + // collection. try (AutoCloseable bundleFactoryCloser = stageBundleFactory; - AutoCloseable closable = stageContext) { + AutoCloseable closable = stageContext) { // DoFnOperator generates another "bundle" for the final watermark // https://issues.apache.org/jira/browse/BEAM-5816 super.dispose(); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java index a1ccca2fd2e0..ede1b5cb2b57 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java @@ -303,9 +303,7 @@ public void close() {} sourceThread.start(); - while (flinkWrapper - .getLocalReaders() - .stream() + while (flinkWrapper.getLocalReaders().stream() .anyMatch(reader -> reader.getWatermark().getMillis() == 0)) { // readers haven't been initialized Thread.sleep(50); @@ -631,7 +629,8 @@ private static void testSourceDoesNotShutdown(boolean shouldHaveReaders) throws SourceFunction.SourceContext sourceContext = Mockito.mock(SourceFunction.SourceContext.class); Object checkpointLock = new Object(); Mockito.when(sourceContext.getCheckpointLock()).thenReturn(checkpointLock); - // Initialize source context early to avoid concurrency issues with its initialization in the run + // Initialize source context early to avoid concurrency issues with its initialization in the + // run // method and the onProcessingTime call on the wrapper. sourceWrapper.setSourceContext(sourceContext); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index ffe92fa5d05f..ff0590b112a4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -69,7 +69,8 @@ */ public static PTransformOverrideFactory< - PCollection>, PCollection, + PCollection>, + PCollection, ParDo.SingleOutput, OutputT>> singleOutputOverrideFactory(DataflowPipelineOptions options) { return new SingleOutputOverrideFactory<>(isFnApi(options)); @@ -81,7 +82,8 @@ */ public static PTransformOverrideFactory< - PCollection>, PCollectionTuple, + PCollection>, + PCollectionTuple, ParDo.MultiOutput, OutputT>> multiOutputOverrideFactory(DataflowPipelineOptions options) { return new MultiOutputOverrideFactory<>(isFnApi(options)); @@ -94,7 +96,8 @@ private static boolean isFnApi(DataflowPipelineOptions options) { private static class SingleOutputOverrideFactory implements PTransformOverrideFactory< - PCollection>, PCollection, + PCollection>, + PCollection, ParDo.SingleOutput, OutputT>> { private final boolean isFnApi; @@ -107,7 +110,8 @@ private SingleOutputOverrideFactory(boolean isFnApi) { public PTransformReplacement>, PCollection> getReplacementTransform( AppliedPTransform< - PCollection>, PCollection, + PCollection>, + PCollection, SingleOutput, OutputT>> transform) { return PTransformReplacement.of( @@ -136,7 +140,8 @@ private MultiOutputOverrideFactory(boolean isFnApi) { public PTransformReplacement>, PCollectionTuple> getReplacementTransform( AppliedPTransform< - PCollection>, PCollectionTuple, + PCollection>, + PCollectionTuple, MultiOutput, OutputT>> transform) { return PTransformReplacement.of( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0db5892c656d..e93a15c893e0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -882,10 +882,7 @@ public void translate(ParDo.MultiOutput transform, TranslationContext context) { ParDo.MultiOutput transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); Map, Coder> outputCoders = - context - .getOutputs(transform) - .entrySet() - .stream() + context.getOutputs(transform).entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder())); @@ -920,10 +917,7 @@ public void translate(ParDoSingle transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); Map, Coder> outputCoders = - context - .getOutputs(transform) - .entrySet() - .stream() + context.getOutputs(transform).entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder())); @@ -991,10 +985,7 @@ public void translate( StepTranslationContext stepContext = context.addStep(transform, "SplittableProcessKeyed"); Map, Coder> outputCoders = - context - .getOutputs(transform) - .entrySet() - .stream() + context.getOutputs(transform).entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder())); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 774614180783..1a7ae3c28fcb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -788,8 +788,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (!isNullOrEmpty(dataflowOptions.getMinCpuPlatform())) { List minCpuFlags = - experiments - .stream() + experiments.stream() .filter(p -> p.startsWith("min_cpu_platform")) .collect(Collectors.toList()); @@ -1677,13 +1676,15 @@ private String getJobIdFromName(String jobName) { private static class PrimitiveCombineGroupedValuesOverrideFactory implements PTransformOverrideFactory< - PCollection>>, PCollection>, + PCollection>>, + PCollection>, Combine.GroupedValues> { @Override public PTransformReplacement>>, PCollection>> getReplacementTransform( AppliedPTransform< - PCollection>>, PCollection>, + PCollection>>, + PCollection>, GroupedValues> transform) { return PTransformReplacement.of( @@ -1726,7 +1727,8 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) { @VisibleForTesting static class StreamingShardedWriteFactory implements PTransformOverrideFactory< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, WriteFiles> { // We pick 10 as a a default, as it works well with the default number of workers started // by Dataflow. @@ -1741,7 +1743,8 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) { public PTransformReplacement, WriteFilesResult> getReplacementTransform( AppliedPTransform< - PCollection, WriteFilesResult, + PCollection, + WriteFilesResult, WriteFiles> transform) { // By default, if numShards is not set WriteFiles will produce one file per bundle. In diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java index 29c1699c75ed..d9758c625aee 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java @@ -66,7 +66,8 @@ public PTransformReplacement, PCollection> getReplacementTransform( AppliedPTransform< - PCollection, PCollection, + PCollection, + PCollection, SingleOutput> transform) { return PTransformReplacement.of( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index ef238b26cb2f..40a05f3f9330 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -49,7 +49,7 @@ private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class); private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) { - int cores = 4; //TODO: decide at runtime? + int cores = 4; // TODO: decide at runtime? if (options.getMaxNumWorkers() > 0) { return options.getMaxNumWorkers() * cores; } else if (options.getNumWorkers() > 0) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index 4d17b6f74d60..6b62fac6094c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -75,7 +75,8 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. + // Do not log 404. It clutters the output and is possibly even required by the + // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) .setApplicationName(options.getAppName()) .setRootUrl(components.rootUrl) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java index 7e9a1d2f32e8..e52c021884f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java @@ -339,8 +339,7 @@ public Node typedApply(ExecutableStageNode input) { Iterables.filter(network.successors(input), OutputReceiverNode.class); Map outputReceiverMap = new HashMap<>(); - Lists.newArrayList(outputReceiverNodes) - .stream() + Lists.newArrayList(outputReceiverNodes).stream() .forEach( outputReceiverNode -> outputReceiverMap.put( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java index 392941f8bf05..aefffee3f301 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java @@ -74,6 +74,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * A {@link WorkExecutor} that processes a list of {@link Operation}s. * @@ -489,8 +490,7 @@ private void updateMetrics(List monitoringInfos) { bundleProcessOperation.getPtransformIdToUserStepContext()); counterUpdates = - monitoringInfos - .stream() + monitoringInfos.stream() .map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate) .filter(Objects::nonNull) .collect(Collectors.toList()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java index dfb2047a5764..73b995bf754f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java @@ -236,7 +236,8 @@ public Node apply(MutableNetwork input) { nodesToPCollections.put(node, pcollectionId); componentsBuilder.putPcollections(pcollectionId, pCollection); - // Check whether this output collection has consumers from worker side when "use_executable_stage_bundle_execution" + // Check whether this output collection has consumers from worker side when + // "use_executable_stage_bundle_execution" // is set if (input.successors(node).stream().anyMatch(RemoteGrpcPortNode.class::isInstance)) { executableStageOutputs.add(PipelineNode.pCollection(pcollectionId, pCollection)); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java index b2913c92962e..9ad594ec65bf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java @@ -109,9 +109,7 @@ public void captureData(PrintWriter writer) { // Then, print out each stack along with the threads that share it. Stacks with more threads // are printed first. - stacks - .entrySet() - .stream() + stacks.entrySet().stream() .sorted(Comparator.comparingInt(e -> -e.getValue().size())) .forEachOrdered( entry -> { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java index cf82391b5317..79d92ba8378f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java @@ -150,9 +150,7 @@ public String apply(String input) { MutableNetwork originalNetwork = createNetwork(); for (String node : originalNetwork.nodes()) { assertEquals( - originalNetwork - .successors(node) - .stream() + originalNetwork.successors(node).stream() .map(function) .collect(Collectors.toCollection(HashSet::new)), network.successors(function.apply(node))); @@ -161,9 +159,7 @@ public String apply(String input) { } assertEquals( network.nodes(), - originalNetwork - .nodes() - .stream() + originalNetwork.nodes().stream() .map(function) .collect(Collectors.toCollection(HashSet::new))); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java index 1b2d16187307..967f8fc0f808 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java @@ -45,8 +45,7 @@ public Server allocateAddressAndCreate( String name = String.format("InProcessServer_%s", serviceNameUniqifier.getAndIncrement()); builder.setUrl(name); InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(name); - services - .stream() + services.stream() .forEach( service -> serverBuilder.addService( @@ -59,8 +58,7 @@ public Server allocateAddressAndCreate( public Server create(List services, ApiServiceDescriptor serviceDescriptor) throws IOException { InProcessServerBuilder builder = InProcessServerBuilder.forName(serviceDescriptor.getUrl()); - services - .stream() + services.stream() .forEach( service -> builder.addService( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java index 38cae631a975..298f054921e4 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java @@ -150,8 +150,7 @@ private static Server createServer(List services, InetSocketAdd // buffer size in the layers above. .maxMessageSize(Integer.MAX_VALUE) .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS); - services - .stream() + services.stream() .forEach( service -> builder.addService( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java index 1ffe4390e74c..0d8372725928 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java @@ -104,9 +104,7 @@ public void getArtifact( ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(request.getRetrievalToken()); // look for file at URI specified by proxy manifest location ArtifactApi.ProxyManifest.Location location = - proxyManifest - .getLocationList() - .stream() + proxyManifest.getLocationList().stream() .filter(loc -> loc.getName().equals(name)) .findFirst() .orElseThrow( @@ -117,8 +115,7 @@ public void getArtifact( List existingArtifacts = proxyManifest.getManifest().getArtifactList(); ArtifactMetadata metadata = - existingArtifacts - .stream() + existingArtifacts.stream() .filter(meta -> meta.getName().equals(name)) .findFirst() .orElseThrow( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java index e477cd9f7f54..04a883dc4603 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java @@ -110,9 +110,7 @@ private static ExecutableProcessBundleDescriptor fromExecutableStageInternal( // Create with all of the processing transforms, and all of the components. // TODO: Remove the unreachable subcomponents if the size of the descriptor matters. Map stageTransforms = - stage - .getTransforms() - .stream() + stage.getTransforms().stream() .collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)); Components.Builder components = @@ -388,7 +386,8 @@ private static TargetEncoding addStageOutput( outputTargetCodersBuilder.put(targetEncoding.getTarget(), targetEncoding.getCoder()); components.putTransforms( timerReference.transform().getId(), - // Since a transform can have more then one timer, update the transform inside components and not the original + // Since a transform can have more then one timer, update the transform inside components + // and not the original components .getTransformsOrThrow(timerReference.transform().getId()) .toBuilder() diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java index fb858dc784c6..94834fcbb471 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java @@ -128,7 +128,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep .addAll(gcsCredentialArgs()) // NOTE: Host networking does not work on Mac, but the command line flag is accepted. .add("--network=host") - // We need to pass on the information about Docker-on-Mac environment (due to missing host networking on Mac) + // We need to pass on the information about Docker-on-Mac environment (due to missing + // host networking on Mac) .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER")); if (!retainDockerContainer) { @@ -206,7 +207,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep * likely only support the latest version at any time. */ private static class DockerOnMac { - // TODO: This host name seems to change with every other Docker release. Do we attempt to keep up + // TODO: This host name seems to change with every other Docker release. Do we attempt to keep + // up // or attempt to document the supported Docker version(s)? private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal"; @@ -280,7 +282,8 @@ private static Platform getPlatform() { String osName = System.getProperty("os.name").toLowerCase(); // TODO: Make this more robust? // The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on - // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable from Linux. + // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable + // from Linux. // We still need to apply port mapping due to missing host networking. if (osName.startsWith("mac") || DockerOnMac.RUNNING_INSIDE_DOCKER_ON_MAC) { return Platform.MAC; diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java index 259a69394d8f..05dd45b1f451 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java @@ -445,9 +445,7 @@ private void assertFiles(Set files, String retrievalToken) throws Except Assert.assertEquals( "Files in locations does not match actual file list.", files, - proxyManifest - .getLocationList() - .stream() + proxyManifest.getLocationList().stream() .map(Location::getName) .collect(Collectors.toSet())); Assert.assertEquals( diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java index 958d242a7567..75d2bb675c9f 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java @@ -103,9 +103,7 @@ public void closeShutsDownEnvironments() throws Exception { p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); ExecutableStage stage = - GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)) - .getFusedStages() - .stream() + GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream() .findFirst() .get(); RemoteEnvironment remoteEnv = mock(RemoteEnvironment.class); @@ -124,9 +122,7 @@ public void closeShutsDownEnvironmentsWhenSomeFail() throws Exception { p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); ExecutableStage firstEnvStage = - GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)) - .getFusedStages() - .stream() + GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream() .findFirst() .get(); ExecutableStagePayload basePayload = diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java index 7c5e9f9e7a0b..78a88fbe01b0 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java @@ -105,8 +105,7 @@ public Admin(BoundedSource source, SamzaPipelineOptions pipelineOptions) { @Override public Map getSystemStreamMetadata(Set streamNames) { - return streamNames - .stream() + return streamNames.stream() .collect( Collectors.toMap( Function.identity(), diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java index 8defed92a7b1..88affe4c0d4d 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java @@ -118,8 +118,7 @@ public Admin(UnboundedSource source, SamzaPipelineOptions pi @Override public Map getSystemStreamMetadata(Set streamNames) { - return streamNames - .stream() + return streamNames.stream() .collect( Collectors.toMap( Function.identity(), @@ -313,7 +312,7 @@ public void run() { updateWatermark(); if (!elementAvailable) { - //TODO: make poll interval configurable + // TODO: make poll interval configurable Thread.sleep(50); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java index 25ebe609743f..0b00e89dac91 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java @@ -69,7 +69,7 @@ public void updateMetrics() { final GaugeUpdater updateGauge = new GaugeUpdater(); results.getGauges().forEach(updateGauge); - //TODO: add distribution metrics to Samza + // TODO: add distribution metrics to Samza } private class CounterUpdater implements Consumer> { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 1d2907447d8a..401cd117b949 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -82,9 +82,8 @@ // This is derivable from pushbackValues which is persisted to a store. // TODO: eagerly initialize the hold in init @edu.umd.cs.findbugs.annotations.SuppressWarnings( - justification = "No bug", - value = "SE_TRANSIENT_FIELD_NOT_RESTORED" - ) + justification = "No bug", + value = "SE_TRANSIENT_FIELD_NOT_RESTORED") private transient Instant pushbackWatermarkHold; // TODO: add this to checkpointable state diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java index 67c343cbb78f..94e319cd916a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java @@ -536,7 +536,7 @@ public void closeIterators() { private class SamzaMapStateImpl extends AbstractSamzaState implements SamzaMapState, KeyValueIteratorState { - private static final int MAX_KEY_SIZE = 100000; //100K bytes + private static final int MAX_KEY_SIZE = 100000; // 100K bytes private final Coder keyCoder; private final byte[] maxKey; private final int storeKeySize; diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java index b9692a221a49..48fb96917cb3 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java @@ -39,8 +39,7 @@ public void processElement(WindowedValue inputElement, OpEmitter emitter) throw new RuntimeException(e); } - windows - .stream() + windows.stream() .map( window -> WindowedValue.of( diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index c144428a13ef..7e7457dfc604 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -71,10 +71,7 @@ public void translate( TranslationContext ctx) { final PCollection input = ctx.getInput(transform); final Map, Coder> outputCoders = - ctx.getCurrentTransform() - .getOutputs() - .entrySet() - .stream() + ctx.getCurrentTransform().getOutputs().entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect( Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); @@ -93,9 +90,7 @@ public void translate( final MessageStream> inputStream = ctx.getMessageStream(input); final List>> sideInputStreams = - transform - .getSideInputs() - .stream() + transform.getSideInputs().stream() .map(ctx::getViewStream) .collect(Collectors.toList()); final Map, Integer> tagToIdMap = new HashMap<>(); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java index 82e772ab5441..928d2887eec2 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java @@ -52,8 +52,7 @@ private TestBoundedSource(List>> events) { @Override public List> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - return events - .stream() + return events.stream() .map(ev -> new TestBoundedSource<>(Collections.singletonList(ev))) .collect(Collectors.toList()); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java index 0d0710e044d3..d526d859dea4 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java @@ -31,7 +31,7 @@ private TestCheckpointMark(int checkpoint) { @Override public void finalizeCheckpoint() throws IOException { - //DO NOTHING + // DO NOTHING } static TestCheckpointMark of(int checkpoint) { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java index a41adbc4e891..59845e0ffacb 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java @@ -62,8 +62,7 @@ private TestUnboundedSource(List>> events) { @Override public List> split( int desiredNumSplits, PipelineOptions options) throws Exception { - return events - .stream() + return events.stream() .map(ev -> new TestUnboundedSource<>(Collections.singletonList(ev))) .collect(Collectors.toList()); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index 49a4c1aa6fe2..3ed6480a8eef 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -77,8 +77,7 @@ private boolean knownComposite(Class> transform) { private boolean shouldDebug(final TransformHierarchy.Node node) { return node == null - || (!transforms - .stream() + || (!transforms.stream() .anyMatch( debugTransform -> debugTransform.getNode().equals(node) && debugTransform.isComposite()) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java index 5e73f52a692d..8acf897884e2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java @@ -98,8 +98,7 @@ private CoderHelpers() {} */ public static Iterable fromByteArrays( Collection serialized, final Coder coder) { - return serialized - .stream() + return serialized.stream() .map(bytes -> fromByteArray(checkNotNull(bytes, "Cannot decode null values."), coder)) .collect(Collectors.toList()); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index 9d207550684f..ce460a3d848a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -85,7 +85,7 @@ * * @param The type of the element in this stream. */ -//TODO: write a proper Builder enforcing all those rules mentioned. +// TODO: write a proper Builder enforcing all those rules mentioned. public final class CreateStream extends PTransform> { public static final String TRANSFORM_URN = "beam:transform:spark:createstream:v1"; @@ -96,7 +96,7 @@ private Instant initialSystemTime; private final boolean forceWatermarkSync; - private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes. + private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; // for test purposes. private CreateStream( Duration batchDuration, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index b327a9b25fc3..abc8244e10bf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -167,7 +167,7 @@ int getNumPartitions() { return numPartitions; } - //---- Bound by time. + // ---- Bound by time. // return the largest between the proportional read time (%batchDuration dedicated for read) // and the min. read time set. @@ -184,7 +184,7 @@ private Duration boundReadDuration(double readTimePercentage, long minReadTimeMi return readDuration; } - //---- Bound by records. + // ---- Bound by records. private scala.Option rateControlledMaxRecords() { final scala.Option rateControllerOption = rateController(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index d88e2b8f044e..e61a088a59c3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -99,7 +99,7 @@ /** State and Timers wrapper. */ public static class StateAndTimers implements Serializable { - //Serializable state for internals (namespace to state tag to coded value). + // Serializable state for internals (namespace to state tag to coded value). private final Table state; private final Collection serTimers; @@ -151,7 +151,8 @@ public void outputWindowedValue( extends AbstractFunction1< Iterator< Tuple3< - /*K*/ ByteArray, Seq>*/ byte[]>, + /*K*/ ByteArray, + Seq>*/ byte[]>, Option>>*/ List>>>>, Iterator< Tuple2>>*/ List>>>> @@ -412,10 +413,11 @@ public void outputWindowedValue( apply( final Iterator< Tuple3< - /*K*/ ByteArray, Seq>*/ byte[]>, + /*K*/ ByteArray, + Seq>*/ byte[]>, Option>>*/ List>>>> input) { - //--- ACTUAL STATEFUL OPERATION: + // --- ACTUAL STATEFUL OPERATION: // // Input Iterator: the partition (~bundle) of a co-grouping of the input // and the previous state (if exists). diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java index 8cb0c0af0a7b..41dbbfa3ce86 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -49,7 +49,7 @@ class SparkStateInternals implements StateInternals { private final K key; - //Serializable state for internals (namespace to state tag to coded value). + // Serializable state for internals (namespace to state tag to coded value). private final Table stateTable; private SparkStateInternals(K key) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 2d21ba566a28..bafe6bf69765 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -93,12 +93,16 @@ */ public static scala.Function3< - Source, Option, State>, + Source, + Option, + State>, Tuple2, Metadata>> mapSourceFunction(final SerializablePipelineOptions options, final String stepName) { return new SerializableFunction3< - Source, Option, State>, + Source, + Option, + State>, Tuple2, Metadata>>() { @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 33e12e1da37e..c860906e2a8e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -83,8 +83,7 @@ JavaRDDLike bytesRDD = rdd.map(CoderHelpers.toByteFunction(windowedValueCoder)); List clientBytes = bytesRDD.collect(); windowedValues = - clientBytes - .stream() + clientBytes.stream() .map(bytes -> CoderHelpers.fromByteArray(bytes, windowedValueCoder)) .collect(Collectors.toList()); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index a1151c662d5b..88f706f256c2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -131,10 +131,7 @@ public void setCurrentTransform(AppliedPTransform transform) { } public Map, Coder> getOutputCoders() { - return currentTransform - .getOutputs() - .entrySet() - .stream() + return currentTransform.getOutputs().entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index 67672a2ed95e..8336350cb43f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -113,7 +113,7 @@ public PipelineOptions getPipelineOptions() { @Override public T sideInput(PCollectionView view) { checkNotNull(input, "Input in SparkCombineContext must not be null!"); - //validate element window. + // validate element window. final Collection elementWindows = input.getWindows(); checkState( elementWindows.size() == 1, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java index b9227f40206c..d5326d076021 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java @@ -71,7 +71,7 @@ public SparkGlobalCombineFn( TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); WindowFn windowFn = windowingStrategy.getWindowFn(); - //--- inputs iterator, by window order. + // --- inputs iterator, by window order. final Iterator> iterator = sortedInputs.iterator(); WindowedValue currentInput = iterator.next(); BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null); @@ -170,7 +170,7 @@ public SparkGlobalCombineFn( @SuppressWarnings("unchecked") TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); - //--- accumulators iterator, by window order. + // --- accumulators iterator, by window order. final Iterator> iterator = sortedAccumulators.iterator(); // get the first accumulator and assign it to the current window's accumulators. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 83189a750532..d66467091652 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -74,7 +74,7 @@ public SparkGroupAlsoByWindowViaOutputBufferFn( K key = windowedValue.getValue().getKey(); Iterable> values = windowedValue.getValue().getValue(); - //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------// + // ------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------// // Used with Batch, we know that all the data is available for this key. We can't use the // timer manager from the context because it doesn't exist. So we create one and emulate the diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java index 81541276512b..879786ddb9e3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java @@ -72,7 +72,7 @@ public OutputT apply(WindowedValue>> windowedKv) { TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); WindowFn windowFn = windowingStrategy.getWindowFn(); - //--- inputs iterator, by window order. + // --- inputs iterator, by window order. final Iterator>> iterator = sortedInputs.iterator(); WindowedValue> currentInput = iterator.next(); BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null); @@ -181,7 +181,7 @@ public OutputT apply(WindowedValue>> windowedKv) { @SuppressWarnings("unchecked") TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); - //--- accumulators iterator, by window order. + // --- accumulators iterator, by window order. final Iterator>> iterator = sortedAccumulators.iterator(); // get the first accumulator and assign it to the current window's accumulators. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java index b64587ded403..163ba55e6d1b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java @@ -68,7 +68,7 @@ SideInputBroadcast getPCollectionView(PCollectionView view, JavaSparkContext } } - //lazily broadcast views + // lazily broadcast views SideInputBroadcast helper = broadcastHelperMap.get(view); if (helper == null) { synchronized (SparkPCollectionView.class) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index fc5c5a6baf3f..e654aebf390b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -80,7 +80,7 @@ public JavaStreamingContext call() throws Exception { // We must first init accumulators since translators expect them to be instantiated. SparkRunner.initAccumulators(options, jsc); - //do not need to create a MetricsPusher instance here because if is called in SparkRunner.run() + // do not need to create a MetricsPusher instance here because if is called in SparkRunner.run() EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc); // update cache candidates diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index debab3427daf..f9253dcfbed7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -49,16 +49,16 @@ public SparkSideInputReader( @Nullable @Override public T get(PCollectionView view, BoundedWindow window) { - //--- validate sideInput. + // --- validate sideInput. checkNotNull(view, "The PCollectionView passed to sideInput cannot be null "); KV, SideInputBroadcast> windowedBroadcastHelper = sideInputs.get(view.getTagInternal()); checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available."); - //--- sideInput window + // --- sideInput window final BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(window); - //--- match the appropriate sideInput window. + // --- match the appropriate sideInput window. // a tag will point to all matching sideInputs, that is all windows. // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it. Iterable>> availableSideInputs = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java index d166d8dabbba..7b72b546e2ef 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java @@ -46,16 +46,10 @@ public InMemoryMetrics( // this might fail in case we have multiple aggregators with the same suffix after // the last dot, but it should be good enough for tests. if (extendedMetricsRegistry != null - && extendedMetricsRegistry - .getGauges() - .keySet() - .stream() + && extendedMetricsRegistry.getGauges().keySet().stream() .anyMatch(Predicates.containsPattern(name + "$")::apply)) { String key = - extendedMetricsRegistry - .getGauges() - .keySet() - .stream() + extendedMetricsRegistry.getGauges().keySet().stream() .filter(Predicates.containsPattern(name + "$")::apply) .findFirst() .get(); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index ba204a97f0d0..ceb87a17b30b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -133,9 +133,8 @@ private static void produce(Map messages) { Serializer stringSerializer = new StringSerializer(); Serializer instantSerializer = new InstantSerializer(); - try ( - KafkaProducer kafkaProducer = - new KafkaProducer(producerProps, stringSerializer, instantSerializer)) { + try (KafkaProducer kafkaProducer = + new KafkaProducer(producerProps, stringSerializer, instantSerializer)) { for (Map.Entry en : messages.entrySet()) { kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue())); } @@ -179,12 +178,12 @@ public void testWithResume() throws Exception { "EOFShallNotPassFn", 4L))); - //--- between executions: + // --- between executions: - //- clear state. + // - clear state. clean(); - //- write a bit more. + // - write a bit more. produce( ImmutableMap.of( "k5", new Instant(499), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 05bdee080ea1..77dedb33511d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -40,7 +40,8 @@ }) /** - * You can indicate a category for the experimental feature. This is unused and serves only as a hint to the reader. + * You can indicate a category for the experimental feature. This is unused and serves only as a + * hint to the reader. */ @Documented public @interface Experimental { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index ad20d63c86ac..bdb971b06143 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -126,9 +126,7 @@ private void verifyDeterministic(Schema schema) throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { List> coders = - schema - .getFields() - .stream() + schema.getFields().stream() .map(Field::getType) .map(RowCoder::coderForFieldType) .collect(Collectors.toList()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index fb786f5caa55..1a5459229d79 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -944,8 +944,7 @@ private CreateTextSourceFn(byte[] delimiter) { checkArgument( 1 == Iterables.size( - allToArgs - .stream() + allToArgs.stream() .filter(Predicates.notNull()::apply) .collect(Collectors.toList())), "Exactly one of filename policy, dynamic destinations, filename prefix, or destination " diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index b712ab46f917..1629b7a3283a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -354,8 +354,7 @@ static boolean printHelpUsageAndExitIfNeeded( } catch (ClassNotFoundException e) { // If we didn't find an exact match, look for any that match the class name. Iterable> matches = - getRegisteredOptions() - .stream() + getRegisteredOptions().stream() .filter( input -> { if (helpOption.contains(".")) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 9b76a4cf4d8d..df2df863b24d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -746,8 +746,7 @@ public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt) if (rawOptionsNode != null && !rawOptionsNode.isNull()) { ObjectNode optionsNode = (ObjectNode) rawOptionsNode; for (Iterator> iterator = optionsNode.fields(); - iterator != null && iterator.hasNext(); - ) { + iterator != null && iterator.hasNext(); ) { Map.Entry field = iterator.next(); fields.put(field.getKey(), field.getValue()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java index 4df0a9313ca2..91659c814f4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java @@ -42,8 +42,7 @@ public List get(Class clazz) { // If the generated class is passed in, we want to look at the base class to find the getters. Class targetClass = AutoValueUtils.getBaseAutoValueClass(clazz); - return ReflectUtils.getMethods(targetClass) - .stream() + return ReflectUtils.getMethods(targetClass).stream() .filter(ReflectUtils::isGetter) // All AutoValue getters are marked abstract. .filter(m -> Modifier.isAbstract(m.getModifiers())) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java index d2ee4fdec11d..d54aaab25680 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java @@ -263,17 +263,13 @@ private FieldAccessDescriptor resolvedNestedFieldsHelper( } nestedFields.putAll( - getNestedFieldsAccessedByName() - .entrySet() - .stream() + getNestedFieldsAccessedByName().entrySet().stream() .collect( Collectors.toMap( e -> schema.indexOf(e.getKey()), e -> resolvedNestedFieldsHelper(schema.getField(e.getKey()), e.getValue())))); nestedFields.putAll( - getNestedFieldsAccessedById() - .entrySet() - .stream() + getNestedFieldsAccessedById().entrySet().stream() .collect( Collectors.toMap( e -> validateFieldId(schema, e.getKey()), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java index 5ff50460b800..881b1cec75f1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java @@ -30,6 +30,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiMap; import org.joda.time.Instant; + /** * Utilities for converting between {@link Schema} field types and {@link TypeDescriptor}s that * define Java objects which can represent these field types. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 61fca2ecb262..176f97dc1f14 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -53,8 +53,7 @@ @Override public List get(Class clazz) { - return ReflectUtils.getMethods(clazz) - .stream() + return ReflectUtils.getMethods(clazz).stream() .filter(ReflectUtils::isGetter) .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) .map(FieldValueTypeInformation::forGetter) @@ -74,8 +73,7 @@ @Override public List get(Class clazz) { - return ReflectUtils.getMethods(clazz) - .stream() + return ReflectUtils.getMethods(clazz).stream() .filter(ReflectUtils::isSetter) .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) .map(FieldValueTypeInformation::forSetter) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index 88986d4875c4..cba5448ca22b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -57,8 +57,7 @@ @Override public List get(Class clazz) { List types = - ReflectUtils.getFields(clazz) - .stream() + ReflectUtils.getFields(clazz).stream() .filter(f -> !f.isAnnotationPresent(SchemaIgnore.class)) .map(FieldValueTypeInformation::forField) .map( @@ -73,8 +72,7 @@ if (ReflectUtils.getAnnotatedCreateMethod(clazz) == null && ReflectUtils.getAnnotatedConstructor(clazz) == null) { Optional finalField = - types - .stream() + types.stream() .map(FieldValueTypeInformation::getField) .filter(f -> Modifier.isFinal(f.getModifiers())) .findAny(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index 5592d1385beb..84e89d6be55a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -256,14 +256,11 @@ private boolean equivalent(Schema other, EquivalenceNullablePolicy nullablePolic } List otherFields = - other - .getFields() - .stream() + other.getFields().stream() .sorted(Comparator.comparing(Field::getName)) .collect(Collectors.toList()); List actualFields = - getFields() - .stream() + getFields().stream() .sorted(Comparator.comparing(Field::getName)) .collect(Collectors.toList()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java index 1059aa16775e..024131efe44c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java @@ -272,8 +272,7 @@ public void verifyCompatibility(Schema inputSchema) { if (!errors.isEmpty()) { String reason = - errors - .stream() + errors.stream() .map(x -> Joiner.on('.').join(x.path()) + ": " + x.message()) .collect(Collectors.joining("\n\t")); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java index 5804d4116248..1ee6fadf756c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java @@ -242,10 +242,7 @@ private FieldAccessDescriptor getFieldAccessDescriptor(TupleTag tag) { KeyedPCollectionTuple keyedPCollectionTuple = KeyedPCollectionTuple.empty(input.getPipeline()); List> sortedTags = - input - .getAll() - .keySet() - .stream() + input.getAll().keySet().stream() .sorted(Comparator.comparing(TupleTag::getId)) .map(t -> new TupleTag(t.getId() + "_ROW")) .collect(Collectors.toList()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java index c7716855efd9..5f5626110226 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java @@ -125,9 +125,7 @@ for (String fieldName : Sets.union( fieldNameFilters.keySet(), - fieldNamesFilters - .keySet() - .stream() + fieldNamesFilters.keySet().stream() .flatMap(List::stream) .collect(Collectors.toSet()))) { schema.getField(fieldName); @@ -135,9 +133,7 @@ for (int fieldIndex : Sets.union( fieldIdFilters.keySet(), - fieldIdsFilters - .keySet() - .stream() + fieldIdsFilters.keySet().stream() .flatMap(List::stream) .collect(Collectors.toSet()))) { if (fieldIndex >= schema.getFieldCount() || fieldIndex < 0) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java index 2fe8698740fb..6462e980026e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java @@ -145,8 +145,7 @@ /** Once the schema is known, this function is called by the {@link Group} transform. */ Inner withSchema(Schema inputSchema, SerializableFunction toRowFunction) { List fieldAggregations = - getFieldAggregations() - .stream() + getFieldAggregations().stream() .map(f -> f.resolve(inputSchema)) .collect(Collectors.toList()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java index 65f398b37ba2..a94e77e3ef35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; + /** * A {@link PTransform} to unnest nested rows. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java index c5c29418ba53..0dec26095343 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java @@ -125,8 +125,7 @@ private static boolean matchConstructor( } Map typeMap = - getterTypes - .stream() + getterTypes.stream() .collect( Collectors.toMap( f -> ReflectUtils.stripGetterPrefix(f.getMethod().getName()), @@ -151,8 +150,7 @@ public static SchemaUserTypeCreator getBuilderCreator( } Map setterTypes = - ReflectUtils.getMethods(builderClass) - .stream() + ReflectUtils.getMethods(builderClass).stream() .filter(ReflectUtils::isSetter) .map(FieldValueTypeInformation::forSetter) .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); @@ -176,8 +174,7 @@ public static SchemaUserTypeCreator getBuilderCreator( } Method buildMethod = - ReflectUtils.getMethods(builderClass) - .stream() + ReflectUtils.getMethods(builderClass).stream() .filter(m -> m.getName().equals("build")) .findAny() .orElseThrow(() -> new RuntimeException("No build method in builder")); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 84278bef0789..39d6323209f2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -88,8 +88,7 @@ // don't need recursion because nested unions aren't supported in AVRO List nonNullTypes = - types - .stream() + types.stream() .filter(x -> x.getType() != org.apache.avro.Schema.Type.NULL) .collect(Collectors.toList()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java index a435d45e4fb9..8aaa200ba4e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java @@ -68,8 +68,7 @@ public static Schema schemaFromJavaBeanClass( public static void validateJavaBean( List getters, List setters) { Map setterMap = - setters - .stream() + setters.stream() .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); for (FieldValueTypeInformation type : getters) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 5d62e82dec27..073ead1e0bd1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -39,12 +39,9 @@ public static List sortBySchema( List types, Schema schema) { Map typeMap = - types - .stream() + types.stream() .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); - return schema - .getFields() - .stream() + return schema.getFields().stream() .map(f -> typeMap.get(f.getName())) .collect(Collectors.toList()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 2cd71f5907d8..3f8b22be7358 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -332,8 +332,7 @@ public void finishBundle() throws Exception { /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */ @Deprecated public List peekOutputElements() { - return peekOutputElementsWithTimestamp() - .stream() + return peekOutputElementsWithTimestamp().stream() .map(TimestampedValue::getValue) .collect(Collectors.toList()); } @@ -342,8 +341,7 @@ public void finishBundle() throws Exception { @Deprecated public List> peekOutputElementsWithTimestamp() { // TODO: Should we return an unmodifiable list? - return getImmutableOutput(mainOutputTag) - .stream() + return getImmutableOutput(mainOutputTag).stream() .map(input -> TimestampedValue.of(input.getValue(), input.getTimestamp())) .collect(Collectors.toList()); } @@ -394,8 +392,7 @@ public void clearOutputElements() { @Deprecated public List peekOutputElements(TupleTag tag) { // TODO: Should we return an unmodifiable list? - return getImmutableOutput(tag) - .stream() + return getImmutableOutput(tag).stream() .map(ValueInSingleWindow::getValue) .collect(Collectors.toList()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 19ff713ef0e8..8c2a11bdaa9e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -173,7 +173,7 @@ public void processElement( numElementsInBatch.add(1L); Long num = numElementsInBatch.read(); if (num % prefetchFrequency == 0) { - //prefetch data and modify batch state (readLater() modifies this) + // prefetch data and modify batch state (readLater() modifies this) batch.readLater(); } if (num >= batchSize) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 4c47ba98e848..878c970ae841 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -679,8 +679,7 @@ static ProcessElementMethod create( * each scoped to a single window. */ public boolean observesWindow() { - return extraParameters() - .stream() + return extraParameters().stream() .anyMatch( Predicates.or( Predicates.instanceOf(WindowParameter.class), @@ -696,8 +695,7 @@ public boolean observesWindow() { @Nullable public RowParameter getRowParameter() { Optional parameter = - extraParameters() - .stream() + extraParameters().stream() .filter(Predicates.instanceOf(RowParameter.class)::apply) .findFirst(); return parameter.isPresent() ? ((RowParameter) parameter.get()) : null; @@ -707,8 +705,7 @@ public RowParameter getRowParameter() { @Nullable public OutputReceiverParameter getMainOutputReceiver() { Optional parameter = - extraParameters() - .stream() + extraParameters().stream() .filter(Predicates.instanceOf(OutputReceiverParameter.class)::apply) .findFirst(); return parameter.isPresent() ? ((OutputReceiverParameter) parameter.get()) : null; @@ -718,8 +715,7 @@ public OutputReceiverParameter getMainOutputReceiver() { * Whether this {@link DoFn} is splittable. */ public boolean isSplittable() { - return extraParameters() - .stream() + return extraParameters().stream() .anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 61089df347b9..33e6a7646c9f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -226,8 +226,7 @@ private MethodAnalysisContext() {} /** Indicates whether a {@link RestrictionTrackerParameter} is known in this context. */ public boolean hasRestrictionTrackerParameter() { - return extraParameters - .stream() + return extraParameters.stream() .anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply); } @@ -238,8 +237,7 @@ public boolean hasWindowParameter() { /** Indicates whether a {@link Parameter.PipelineOptionsParameter} is known in this context. */ public boolean hasPipelineOptionsParamter() { - return extraParameters - .stream() + return extraParameters.stream() .anyMatch(Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)::apply); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java index 3101732bba84..ea231c2c0a54 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java @@ -83,8 +83,7 @@ public FilePatternMatchingShardedFile(String filePattern) { Collection files = FileSystems.match(filePattern).metadata(); LOG.debug( "Found file(s) {} by matching the path: {}", - files - .stream() + files.stream() .map(Metadata::resourceId) .map(ResourceId::getFilename) .collect(Collectors.joining(",")), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java index b5e724d5af3f..b359bb532e21 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java @@ -166,8 +166,7 @@ public static boolean isCancelled(CompletionStage future) { return blockAndDiscard.thenApply( nothing -> - futures - .stream() + futures.stream() .map(future -> future.toCompletableFuture().join()) .collect(Collectors.toList())); } @@ -179,9 +178,8 @@ public static boolean isCancelled(CompletionStage future) { * #allAsList(Collection)}. */ @SuppressWarnings( - value = "NM_CLASS_NOT_EXCEPTION", - justification = "The class does hold an exception; its name is accurate." - ) + value = "NM_CLASS_NOT_EXCEPTION", + justification = "The class does hold an exception; its name is accurate.") @AutoValue public abstract static class ExceptionOrResult { @@ -218,8 +216,7 @@ public static boolean isCancelled(CompletionStage future) { return blockAndDiscard.thenApply( nothing -> - futures - .stream() + futures.stream() .map( future -> { // The limited scope of the exceptions wrapped allows CancellationException diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java index a6f20a0b3b54..3b1805f1d206 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java @@ -145,10 +145,7 @@ private static Row jsonObjectToRow(FieldValue rowFieldValue) { + "can be parsed to Beam Rows"); } - return rowFieldValue - .rowSchema() - .getFields() - .stream() + return rowFieldValue.rowSchema().getFields().stream() .map( schemaField -> extractJsonNodeValue( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 65d37c82e54c..4bc8048135cc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -209,12 +209,12 @@ public void testTransientFieldInitialization() throws Exception { Pojo value = new Pojo("Hello", 42); AvroCoder coder = AvroCoder.of(Pojo.class); - //Serialization of object + // Serialization of object ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bos); out.writeObject(coder); - //De-serialization of object + // De-serialization of object ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); ObjectInputStream in = new ObjectInputStream(bis); AvroCoder copied = (AvroCoder) in.readObject(); @@ -232,11 +232,11 @@ public void testKryoSerialization() throws Exception { Pojo value = new Pojo("Hello", 42); AvroCoder coder = AvroCoder.of(Pojo.class); - //Kryo instantiation + // Kryo instantiation Kryo kryo = new Kryo(); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); - //Serialization of object without any memoization + // Serialization of object without any memoization ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream(); try (Output output = new Output(coderWithoutMemoizationBos)) { kryo.writeObject(output, coder); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java index 4e0621a6374e..1163b6e89bce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java @@ -293,8 +293,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception { String[] aElements = Iterables.toArray( StreamSupport.stream( - elements - .stream() + elements.stream() .filter( Predicates.compose(new StartsWith("a"), new ExtractWriteDestination()) ::apply) @@ -307,8 +306,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception { String[] bElements = Iterables.toArray( StreamSupport.stream( - elements - .stream() + elements.stream() .filter( Predicates.compose(new StartsWith("b"), new ExtractWriteDestination()) ::apply) @@ -321,8 +319,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception { String[] cElements = Iterables.toArray( StreamSupport.stream( - elements - .stream() + elements.stream() .filter( Predicates.compose(new StartsWith("c"), new ExtractWriteDestination()) ::apply) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java index 131c1779109a..6d26ef570e54 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java @@ -27,6 +27,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; + /** test for {@link FieldTypeDescriptors}. */ public class FieldTypeDescriptorsTest { @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java index 5cdf56ddfafe..462fb05d9b19 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; + /** Tests for {@link org.apache.beam.sdk.schemas.transforms.Unnest}. */ public class UnnestTest implements Serializable { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -78,16 +79,14 @@ public void testSimpleUnnesting() { .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i, Integer.toString(i)).build()) .collect(Collectors.toList()); List rows = - bottomRow - .stream() + bottomRow.stream() .map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build()) .collect(Collectors.toList()); PCollection unnested = pipeline.apply(Create.of(rows).withRowSchema(NESTED_SCHEMA)).apply(Unnest.create()); assertEquals(UNNESTED_SCHEMA, unnested.getSchema()); List expected = - bottomRow - .stream() + bottomRow.stream() .map( r -> Row.withSchema(UNNESTED_SCHEMA) @@ -116,8 +115,7 @@ public void testAlternateNamePolicy() { .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i, Integer.toString(i)).build()) .collect(Collectors.toList()); List rows = - bottomRow - .stream() + bottomRow.stream() .map(r -> Row.withSchema(NESTED_SCHEMA2).addValues(r).build()) .collect(Collectors.toList()); PCollection unnested = @@ -126,8 +124,7 @@ public void testAlternateNamePolicy() { .apply(Unnest.create().withFieldNameFunction(Unnest.KEEP_NESTED_NAME)); assertEquals(UNNESTED2_SCHEMA_ALTERNATE, unnested.getSchema()); List expected = - bottomRow - .stream() + bottomRow.stream() .map( r -> Row.withSchema(UNNESTED2_SCHEMA_ALTERNATE) @@ -148,8 +145,7 @@ public void testClashingNamePolicy() { .collect(Collectors.toList()); thrown.expect(IllegalArgumentException.class); List rows = - bottomRow - .stream() + bottomRow.stream() .map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build()) .collect(Collectors.toList()); PCollection unnested = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java index e753ba388840..8fd8f859f62e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java @@ -63,8 +63,10 @@ public void testFlatMapSimpleFunction() throws Exception { pipeline .apply(Create.of(1, 2, 3)) - // Note that FlatMapElements takes an InferableFunction> - // so the use of List here (as opposed to Iterable) deliberately exercises + // Note that FlatMapElements takes an InferableFunction> + // so the use of List here (as opposed to Iterable) deliberately + // exercises // the use of an upper bound. .apply( FlatMapElements.via( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index b1d00e6c5f98..a36010dbcdf2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -85,9 +85,7 @@ private void method( assertTrue(signature.isSplittable()); assertTrue( - signature - .extraParameters() - .stream() + signature.extraParameters().stream() .anyMatch( Predicates.instanceOf(DoFnSignature.Parameter.RestrictionTrackerParameter.class) ::apply)); @@ -310,7 +308,8 @@ public CoderT getRestrictionCoder() { DoFnSignature signature = DoFnSignatures.getSignature( new GoodGenericSplittableDoFn< - SomeRestriction, RestrictionTracker, + SomeRestriction, + RestrictionTracker, SomeRestrictionCoder>() {}.getClass()); assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 9de0408efeae..a516b5987865 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -372,7 +372,8 @@ static String getRegionFromZone(String zone) { Transport.getJsonFactory(), chainHttpRequestInitializer( credentials, - // Do not log 404. It clutters the output and is possibly even required by the caller. + // Do not log 404. It clutters the output and is possibly even required by the + // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java index 4e0066067dca..0e1b6bde641f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -96,7 +96,8 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), - // Do not log the code 404. Code up the stack will deal with 404's if needed, and + // Do not log the code 404. Code up the stack will deal with 404's if needed, + // and // logging it by default clutters the output during file staging. new RetryHttpRequestInitializer( ImmutableList.of(404), new UploadIdResponseInterceptor()))) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index 51937ad95442..70867508e929 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -51,7 +51,7 @@ public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); Pipeline p = Pipeline.create(options); - //define the input row format + // define the input row format Schema type = Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build(); @@ -59,7 +59,7 @@ public static void main(String[] args) { Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build(); Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build(); - //create a source PCollection with Create.of(); + // create a source PCollection with Create.of(); PCollection inputTable = PBegin.in(p) .apply( @@ -67,7 +67,7 @@ public static void main(String[] args) { .withSchema( type, SerializableFunctions.identity(), SerializableFunctions.identity())); - //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; + // Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection outputStream = inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java index f3327d857347..318d90df15e7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java @@ -75,10 +75,7 @@ private JdbcConnection(CalciteConnection connection) throws SQLException { * to a map of pipeline options. */ private static Map extractPipelineOptions(CalciteConnection calciteConnection) { - return calciteConnection - .getProperties() - .entrySet() - .stream() + return calciteConnection.getProperties().entrySet().stream() .map(entry -> KV.of(entry.getKey().toString(), entry.getValue().toString())) .filter(kv -> kv.getKey().startsWith(PIPELINE_OPTION_PREFIX)) .map(kv -> KV.of(kv.getKey().substring(PIPELINE_OPTION_PREFIX.length()), kv.getValue())) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java index cbdbbb6aa9c9..70aa89d3d6ee 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java @@ -54,7 +54,7 @@ new FunctionParameter() { @Override public int getOrdinal() { - return 0; //up to one parameter is supported in UDAF. + return 0; // up to one parameter is supported in UDAF. } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 050aeec189d5..419cef17881e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -116,8 +116,7 @@ public RelWriter explainTerms(RelWriter pw) { public PTransform, PCollection> buildPTransform() { Schema outputSchema = CalciteUtils.toSchema(getRowType()); List aggregationAdapters = - getNamedAggCalls() - .stream() + getNamedAggCalls().stream() .map(aggCall -> new FieldAggregation(aggCall.getKey(), aggCall.getValue())) .collect(toList()); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 3a76a615f461..408922be836f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -284,16 +284,16 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { case ARRAY: return ((List) beamValue) .stream() - .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem)) - .collect(Collectors.toList()); + .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem)) + .collect(Collectors.toList()); case MAP: return ((Map) beamValue) - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> entry.getKey(), - entry -> fieldToAvatica(type.getCollectionElementType(), entry.getValue()))); + .entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> + fieldToAvatica(type.getCollectionElementType(), entry.getValue()))); case ROW: // TODO: needs to be a Struct return beamValue; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java index 5f060ec5567a..29e53d1a2e6c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java @@ -35,8 +35,7 @@ * @return bounded if and only if all PCollection inputs are bounded */ default PCollection.IsBounded isBounded() { - return getPCollectionInputs() - .stream() + return getPCollectionInputs().stream() .allMatch( rel -> BeamSqlRelUtils.getBeamRelInput(rel).isBounded() diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 760e37bf1ed2..bdc95724240d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -157,7 +157,8 @@ public int getCount() { PCollection upstream = pinput.get(0); // There is a need to separate ORDER BY LIMIT and LIMIT: - // - GroupByKey (used in Top) is not allowed on unbounded data in global window so ORDER BY ... LIMIT + // - GroupByKey (used in Top) is not allowed on unbounded data in global window so ORDER BY + // ... LIMIT // works only on bounded data. // - Just LIMIT operates on unbounded data, but across windows. if (fieldIndices.isEmpty()) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java index d48a6979e92a..9f0fd5547b32 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java @@ -43,8 +43,7 @@ return PCollectionList.empty(pipeline); } else { return PCollectionList.of( - inputRels - .stream() + inputRels.stream() .map(input -> BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, cache)) .collect(Collectors.toList())); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java index 798f0e6e651c..beb27fc68a1a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java @@ -92,8 +92,8 @@ public static String beamRow2CsvLine(Row row, CSVFormat csvFormat) { /** * Attempt to cast an object to a specified Schema.Field.Type. - * @throws IllegalArgumentException if the value cannot be cast to that type. * + * @throws IllegalArgumentException if the value cannot be cast to that type. * @return The casted object in Schema.Field.Type. */ public static Object autoCastField(Schema.Field field, Object rawObj) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java index 735315224616..1b0532c179a5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java @@ -86,7 +86,8 @@ public void processElement(ProcessContext ctx) { // Say for Row R, there are m instances on left and n instances on right, // INTERSECT ALL outputs MIN(m, n) instances of R. - Iterator iter = (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator(); + Iterator iter = + (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator(); while (iter.hasNext()) { ctx.output(iter.next()); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java index e338ea9cd9d5..195ac9604a48 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java @@ -27,8 +27,7 @@ public abstract class BeamBuiltinFunctionProvider { public Map> getBuiltinMethods() { List methods = Arrays.asList(getClass().getMethods()); - return methods - .stream() + return methods.stream() .filter(BeamBuiltinFunctionProvider::isUDF) .collect( Collectors.groupingBy(method -> method.getDeclaredAnnotation(UDF.class).funcName())); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java index 0919ea72a549..97d3ffc76d29 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java @@ -35,10 +35,9 @@ // return null for boolean is not allowed. // TODO: handle null input. @UDF( - funcName = "ENDS_WITH", - parameterArray = {TypeName.STRING}, - returnType = TypeName.STRING - ) + funcName = "ENDS_WITH", + parameterArray = {TypeName.STRING}, + returnType = TypeName.STRING) public Boolean endsWith(String str1, String str2) { return str1.endsWith(str2); } @@ -47,19 +46,17 @@ public Boolean endsWith(String str1, String str2) { // return null for boolean is not allowed. // TODO: handle null input. @UDF( - funcName = "STARTS_WITH", - parameterArray = {TypeName.STRING}, - returnType = TypeName.STRING - ) + funcName = "STARTS_WITH", + parameterArray = {TypeName.STRING}, + returnType = TypeName.STRING) public Boolean startsWith(String str1, String str2) { return str1.startsWith(str2); } @UDF( - funcName = "LENGTH", - parameterArray = {TypeName.STRING}, - returnType = TypeName.INT64 - ) + funcName = "LENGTH", + parameterArray = {TypeName.STRING}, + returnType = TypeName.INT64) public Long lengthString(String str) { if (str == null) { return null; @@ -68,10 +65,9 @@ public Long lengthString(String str) { } @UDF( - funcName = "LENGTH", - parameterArray = {TypeName.BYTES}, - returnType = TypeName.INT64 - ) + funcName = "LENGTH", + parameterArray = {TypeName.BYTES}, + returnType = TypeName.INT64) public Long lengthBytes(byte[] bytes) { if (bytes == null) { return null; @@ -80,10 +76,9 @@ public Long lengthBytes(byte[] bytes) { } @UDF( - funcName = "REVERSE", - parameterArray = {TypeName.STRING}, - returnType = TypeName.STRING - ) + funcName = "REVERSE", + parameterArray = {TypeName.STRING}, + returnType = TypeName.STRING) public String reverseString(String str) { if (str == null) { return null; @@ -92,10 +87,9 @@ public String reverseString(String str) { } @UDF( - funcName = "REVERSE", - parameterArray = {TypeName.BYTES}, - returnType = TypeName.BYTES - ) + funcName = "REVERSE", + parameterArray = {TypeName.BYTES}, + returnType = TypeName.BYTES) public byte[] reverseBytes(byte[] bytes) { if (bytes == null) { return null; @@ -106,10 +100,9 @@ public String reverseString(String str) { } @UDF( - funcName = "FROM_HEX", - parameterArray = {TypeName.STRING}, - returnType = TypeName.BYTES - ) + funcName = "FROM_HEX", + parameterArray = {TypeName.STRING}, + returnType = TypeName.BYTES) public byte[] fromHex(String str) { if (str == null) { return null; @@ -123,10 +116,9 @@ public String reverseString(String str) { } @UDF( - funcName = "TO_HEX", - parameterArray = {TypeName.BYTES}, - returnType = TypeName.STRING - ) + funcName = "TO_HEX", + parameterArray = {TypeName.BYTES}, + returnType = TypeName.STRING) public String toHex(byte[] bytes) { if (bytes == null) { return null; @@ -136,19 +128,17 @@ public String toHex(byte[] bytes) { } @UDF( - funcName = "LPAD", - parameterArray = {TypeName.STRING, TypeName.INT64}, - returnType = TypeName.STRING - ) + funcName = "LPAD", + parameterArray = {TypeName.STRING, TypeName.INT64}, + returnType = TypeName.STRING) public String lpad(String originalValue, Long returnLength) { return lpad(originalValue, returnLength, " "); } @UDF( - funcName = "LPAD", - parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING}, - returnType = TypeName.STRING - ) + funcName = "LPAD", + parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING}, + returnType = TypeName.STRING) public String lpad(String originalValue, Long returnLength, String pattern) { if (originalValue == null || returnLength == null || pattern == null) { return null; @@ -169,19 +159,17 @@ public String lpad(String originalValue, Long returnLength, String pattern) { } @UDF( - funcName = "LPAD", - parameterArray = {TypeName.BYTES, TypeName.INT64}, - returnType = TypeName.BYTES - ) + funcName = "LPAD", + parameterArray = {TypeName.BYTES, TypeName.INT64}, + returnType = TypeName.BYTES) public byte[] lpad(byte[] originalValue, Long returnLength) { return lpad(originalValue, returnLength, " ".getBytes(UTF_8)); } @UDF( - funcName = "LPAD", - parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES}, - returnType = TypeName.BYTES - ) + funcName = "LPAD", + parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES}, + returnType = TypeName.BYTES) public byte[] lpad(byte[] originalValue, Long returnLength, byte[] pattern) { if (originalValue == null || returnLength == null || pattern == null) { return null; @@ -214,19 +202,17 @@ public String lpad(String originalValue, Long returnLength, String pattern) { } @UDF( - funcName = "RPAD", - parameterArray = {TypeName.STRING, TypeName.INT64}, - returnType = TypeName.STRING - ) + funcName = "RPAD", + parameterArray = {TypeName.STRING, TypeName.INT64}, + returnType = TypeName.STRING) public String rpad(String originalValue, Long returnLength) { return lpad(originalValue, returnLength, " "); } @UDF( - funcName = "RPAD", - parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING}, - returnType = TypeName.STRING - ) + funcName = "RPAD", + parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING}, + returnType = TypeName.STRING) public String rpad(String originalValue, Long returnLength, String pattern) { if (originalValue == null || returnLength == null || pattern == null) { return null; @@ -247,19 +233,17 @@ public String rpad(String originalValue, Long returnLength, String pattern) { } @UDF( - funcName = "RPAD", - parameterArray = {TypeName.BYTES, TypeName.INT64}, - returnType = TypeName.BYTES - ) + funcName = "RPAD", + parameterArray = {TypeName.BYTES, TypeName.INT64}, + returnType = TypeName.BYTES) public byte[] rpad(byte[] originalValue, Long returnLength) { return lpad(originalValue, returnLength, " ".getBytes(UTF_8)); } @UDF( - funcName = "RPAD", - parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES}, - returnType = TypeName.BYTES - ) + funcName = "RPAD", + parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES}, + returnType = TypeName.BYTES) public byte[] rpad(byte[] originalValue, Long returnLength, byte[] pattern) { if (originalValue == null || returnLength == null || pattern == null) { return null; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java index 9049284d4db7..9a5f8ab1a550 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java @@ -31,10 +31,9 @@ */ // TODO: handle overflow @UDF( - funcName = "COSH", - parameterArray = {Schema.TypeName.DOUBLE}, - returnType = Schema.TypeName.DOUBLE - ) + funcName = "COSH", + parameterArray = {Schema.TypeName.DOUBLE}, + returnType = Schema.TypeName.DOUBLE) public Double cosh(Double o) { if (o == null) { return null; @@ -49,10 +48,9 @@ public Double cosh(Double o) { */ // TODO: handle overflow @UDF( - funcName = "SINH", - parameterArray = {Schema.TypeName.DOUBLE}, - returnType = Schema.TypeName.DOUBLE - ) + funcName = "SINH", + parameterArray = {Schema.TypeName.DOUBLE}, + returnType = Schema.TypeName.DOUBLE) public Double sinh(Double o) { if (o == null) { return null; @@ -66,10 +64,9 @@ public Double sinh(Double o) { *

Computes hyperbolic tangent of X. Does not fail. */ @UDF( - funcName = "TANH", - parameterArray = {Schema.TypeName.DOUBLE}, - returnType = Schema.TypeName.DOUBLE - ) + funcName = "TANH", + parameterArray = {Schema.TypeName.DOUBLE}, + returnType = Schema.TypeName.DOUBLE) public Double tanh(Double o) { if (o == null) { return null; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java index 9e13cc85630f..c441303d28ef 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java @@ -33,19 +33,17 @@ private static final String SQL_FUNCTION_NAME = "IS_INF"; @UDF( - funcName = SQL_FUNCTION_NAME, - parameterArray = {Schema.TypeName.DOUBLE}, - returnType = Schema.TypeName.BOOLEAN - ) + funcName = SQL_FUNCTION_NAME, + parameterArray = {Schema.TypeName.DOUBLE}, + returnType = Schema.TypeName.BOOLEAN) public Boolean isInf(Double value) { return Double.isInfinite(value); } @UDF( - funcName = SQL_FUNCTION_NAME, - parameterArray = {Schema.TypeName.FLOAT}, - returnType = Schema.TypeName.BOOLEAN - ) + funcName = SQL_FUNCTION_NAME, + parameterArray = {Schema.TypeName.FLOAT}, + returnType = Schema.TypeName.BOOLEAN) public Boolean isInf(Float value) { return Float.isInfinite(value); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java index 6bc1d31a2a68..f2bee2c2ccae 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java @@ -32,19 +32,17 @@ private static final String SQL_FUNCTION_NAME = "IS_NAN"; @UDF( - funcName = SQL_FUNCTION_NAME, - parameterArray = {Schema.TypeName.FLOAT}, - returnType = Schema.TypeName.BOOLEAN - ) + funcName = SQL_FUNCTION_NAME, + parameterArray = {Schema.TypeName.FLOAT}, + returnType = Schema.TypeName.BOOLEAN) public Boolean isNan(Float value) { return Float.isNaN(value); } @UDF( - funcName = SQL_FUNCTION_NAME, - parameterArray = {Schema.TypeName.DOUBLE}, - returnType = Schema.TypeName.BOOLEAN - ) + funcName = SQL_FUNCTION_NAME, + parameterArray = {Schema.TypeName.DOUBLE}, + returnType = Schema.TypeName.BOOLEAN) public Boolean isNan(Double value) { return Double.isNaN(value); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index b227f88967c7..cbc23f0dd013 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -202,7 +202,7 @@ private static RelDataType toRelDataType( * @return */ public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) { - //For Joda time types, return SQL type for java.util.Date. + // For Joda time types, return SQL type for java.util.Date. if (rawType instanceof Class && AbstractInstant.class.isAssignableFrom((Class) rawType)) { return typeFactory.createJavaType(Date.class); } else if (rawType instanceof Class && ByteString.class.isAssignableFrom((Class) rawType)) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java index 1bb8dee3ba43..d7703808fc6a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java @@ -95,9 +95,7 @@ public void processElement(ProcessContext context) { * payload, and attributes. */ private List getFieldValues(ProcessContext context) { - return messageSchema() - .getFields() - .stream() + return messageSchema().getFields().stream() .map(field -> getValueForField(field, context.timestamp(), context.element())) .collect(toList()); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java index 146f5cf8fa8c..3689ba831c50 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java @@ -82,9 +82,7 @@ public void dropTable(String tableName) { @Override public Map getTables() { - return tables() - .entrySet() - .stream() + return tables().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table)); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java index 9ba719332956..d1336002aad5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java @@ -75,8 +75,7 @@ public static Schema buildBeamSqlSchema(Object... args) { * } */ public static List buildRows(Schema type, List rowsValues) { - return Lists.partition(rowsValues, type.getFieldCount()) - .stream() + return Lists.partition(rowsValues, type.getFieldCount()).stream() .map(values -> values.stream().collect(toRow(type))) .collect(toList()); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index 3c462952e6ad..28f3f752443d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -60,14 +60,14 @@ static List rowsOfBytes; static List rowsOfBytesPaddingTest; - //bounded PCollections + // bounded PCollections protected PCollection boundedInput1; protected PCollection boundedInput2; protected PCollection boundedInputFloatDouble; protected PCollection boundedInputBytes; protected PCollection boundedInputBytesPaddingTest; - //unbounded PCollections + // unbounded PCollections protected PCollection unboundedInput1; protected PCollection unboundedInput2; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java index 04ae5f17e6b8..f69f3b985a4b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java @@ -164,9 +164,7 @@ private static SqlOperatorId sqlOperatorId(SqlOperator sqlOperator) { Set declaredOperators = new HashSet<>(); declaredOperators.addAll( - SqlStdOperatorTable.instance() - .getOperatorList() - .stream() + SqlStdOperatorTable.instance().getOperatorList().stream() .map(operator -> sqlOperatorId(operator.getName(), operator.getKind())) .collect(Collectors.toList())); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java index 7582aa4d8f16..a7525d2c61ef 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java @@ -197,8 +197,7 @@ public void testSelectsFromExistingTable() throws Exception { connection.createStatement().executeQuery("SELECT id, name FROM person"); List resultRows = - readResultSet(selectResult) - .stream() + readResultSet(selectResult).stream() .map(values -> values.stream().collect(toRow(BASIC_SCHEMA))) .collect(Collectors.toList()); @@ -322,8 +321,7 @@ public void testSelectsFromExistingComplexTable() throws Exception { .executeQuery("SELECT person.nestedRow.id, person.nestedRow.name FROM person"); List resultRows = - readResultSet(selectResult) - .stream() + readResultSet(selectResult).stream() .map(values -> values.stream().collect(toRow(BASIC_SCHEMA))) .collect(Collectors.toList()); @@ -350,8 +348,7 @@ public void testInsertIntoCreatedTable() throws Exception { connection.createStatement().executeQuery("SELECT id, name FROM person"); List resultRows = - readResultSet(selectResult) - .stream() + readResultSet(selectResult).stream() .map(resultValues -> resultValues.stream().collect(toRow(BASIC_SCHEMA))) .collect(Collectors.toList()); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java index acc7633fa4f6..39b5f0df014b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java @@ -119,10 +119,7 @@ private String unparseMap(FieldType fieldType) { private String unparseRow(FieldType fieldType) { return "ROW<" - + fieldType - .getRowSchema() - .getFields() - .stream() + + fieldType.getRowSchema().getFields().stream() .map(field -> field.getName() + " " + unparse(field.getType())) .collect(joining(",")) + ">"; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java index 7eea6ece2896..02349a558c43 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java @@ -45,9 +45,7 @@ public void setUp() { Map calciteRowTypeFields(Schema schema) { final RelDataType dataType = CalciteUtils.toCalciteRowType(schema, dataTypeFactory); - return dataType - .getFieldNames() - .stream() + return dataType.getFieldNames().stream() .collect( Collectors.toMap( x -> x, diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java index 062700ba0fe6..e8fa1358600d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java @@ -340,9 +340,8 @@ private CalciteConnection connect(PipelineOptions options, TableProvider... tabl // The actual options are in the "options" field of the converted map Map argsMap = ((Map) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options")) - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue()))); + .entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue()))); InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore(); for (TableProvider tableProvider : tableProviders) { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java index ef5c40eaf7b9..28fc4a406c46 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java @@ -71,7 +71,9 @@ static class Factory extends DoFnPTransformRunnerFactory< - KV, InputT, OutputT, + KV, + InputT, + OutputT, SplittableProcessElementsRunner> { @Override diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java index d52ad469ef01..32797fa9da3e 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java @@ -246,7 +246,7 @@ public void testBasicInboundConsumerBehaviour() throws Exception { @Test(timeout = 10000) public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception { CountDownLatch waitForClientToConnect = new CountDownLatch(1); - //Collection> inboundValuesA = new ConcurrentLinkedQueue<>(); + // Collection> inboundValuesA = new ConcurrentLinkedQueue<>(); Collection> inboundValuesB = new ConcurrentLinkedQueue<>(); Collection inboundServerValues = new ConcurrentLinkedQueue<>(); AtomicReference> outboundServerObserver = diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java index 908e31c77f36..a1ac65ecec4a 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java @@ -241,10 +241,9 @@ public SSEAwsKeyManagementParams deserialize(JsonParser parser, DeserializationC } @JsonAutoDetect( - fieldVisibility = Visibility.NONE, - getterVisibility = Visibility.NONE, - setterVisibility = Visibility.NONE - ) + fieldVisibility = Visibility.NONE, + getterVisibility = Visibility.NONE, + setterVisibility = Visibility.NONE) interface ClientConfigurationMixin { @JsonProperty String getProxyHost(); diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 50a10a357a3c..1276a79c6ddf 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -596,8 +596,7 @@ protected void rename( @Override protected void delete(Collection resourceIds) throws IOException { List nonDirectoryPaths = - resourceIds - .stream() + resourceIds.stream() .filter(s3ResourceId -> !s3ResourceId.isDirectory()) .collect(Collectors.toList()); Multimap keysByBucket = ArrayListMultimap.create(); diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java index df706082d7db..449dd1e3af16 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java @@ -82,7 +82,7 @@ @Experimental(Experimental.Kind.SOURCE_SINK) public final class SnsIO { - //Write data tp SNS + // Write data tp SNS public static Write write() { return new AutoValue_SnsIO_Write.Builder().build(); } @@ -283,7 +283,7 @@ public PCollectionTuple expand(PCollection input) { @Setup public void setup() throws Exception { - //Initialize SnsPublisher + // Initialize SnsPublisher producer = spec.getAWSClientsProvider().createSnsPublisher(); checkArgument( topicExists(producer, spec.getTopicName()), diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java index db637a4a2afe..46640e39ab96 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java @@ -696,19 +696,19 @@ public void testWriteAndRead() throws IOException { ByteBuffer bb = ByteBuffer.allocate(writtenArray.length); bb.put(writtenArray); - //First create an object and write data to it + // First create an object and write data to it S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar.txt"); WritableByteChannel writableByteChannel = s3FileSystem.create(path, builder().setMimeType("application/text").build()); writableByteChannel.write(bb); writableByteChannel.close(); - //Now read the same object + // Now read the same object ByteBuffer bb2 = ByteBuffer.allocate(writtenArray.length); ReadableByteChannel open = s3FileSystem.open(path); open.read(bb2); - //And compare the content with the one that was written + // And compare the content with the one that was written byte[] readArray = bb2.array(); assertArrayEquals(readArray, writtenArray); open.close(); diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java index e648112820a1..ee31292fe4a9 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java @@ -214,21 +214,14 @@ static long getEstimatedSizeBytes(List tokenRanges) { SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner()); List tokens = - cluster - .getMetadata() - .getTokenRanges() - .stream() + cluster.getMetadata().getTokenRanges().stream() .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString())) .collect(Collectors.toList()); List> splits = splitGenerator.generateSplits(numSplits, tokens); LOG.info("{} splits were actually generated", splits.size()); final String partitionKey = - cluster - .getMetadata() - .getKeyspace(spec.keyspace()) - .getTable(spec.table()) - .getPartitionKey() + cluster.getMetadata().getKeyspace(spec.keyspace()).getTable(spec.table()).getPartitionKey() .stream() .map(ColumnMetadata::getName) .collect(Collectors.joining(",")); diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 0eaca4f58121..da5e54dd0389 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -366,9 +366,7 @@ private static void set(Properties properties, String param, Object value) { @VisibleForTesting static String insertSql(TableSchema schema, String table) { String columnsStr = - schema - .columns() - .stream() + schema.columns().stream() .filter(x -> !x.materializedOrAlias()) .map(x -> quoteIdentifier(x.name())) .collect(Collectors.joining(", ")); diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java index b8b1e3bbb651..75692d2859b4 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java @@ -45,9 +45,7 @@ public static TableSchema of(Column... columns) { * @return Beam schema */ public static Schema getEquivalentSchema(TableSchema tableSchema) { - return tableSchema - .columns() - .stream() + return tableSchema.columns().stream() .map( x -> { if (x.columnType().nullable()) { diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 55788b007bca..9a6da467928b 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -53,7 +53,7 @@ private static Node node; private static RestClient restClient; private static ConnectionConfiguration connectionConfiguration; - //cannot use inheritance because ES5 test already extends ESIntegTestCase. + // cannot use inheritance because ES5 test already extends ESIntegTestCase. private static ElasticsearchIOTestCommon elasticsearchIOTestCommon; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 24748b9e4637..806b93e307d0 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) { public Settings indexSettings() { return Settings.builder() .put(super.indexSettings()) - //useful to have updated sizes for getEstimatedSize + // useful to have updated sizes for getEstimatedSize .put("index.store.stats_refresh_interval", 0) .build(); } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index d731f339424a..0579cfa115c0 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) { public Settings indexSettings() { return Settings.builder() .put(super.indexSettings()) - //useful to have updated sizes for getEstimatedSize + // useful to have updated sizes for getEstimatedSize .put("index.store.stats_refresh_interval", 0) .build(); } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 579c968adf46..129619eb8521 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -185,9 +185,9 @@ void testRead() throws Exception { pipeline.apply( ElasticsearchIO.read() .withConnectionConfiguration(connectionConfiguration) - //set to default value, useful just to test parameter passing. + // set to default value, useful just to test parameter passing. .withScrollKeepalive("5m") - //set to default value, useful just to test parameter passing. + // set to default value, useful just to test parameter passing. .withBatchSize(100L)); PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(numDocs); pipeline.run(); @@ -594,7 +594,8 @@ void testDefaultRetryPredicate(RestClient restClient) throws IOException { */ void testWriteRetry() throws Throwable { expectedException.expectCause(isA(IOException.class)); - // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and retry started. + // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and + // retry started. expectedException.expectMessage( String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, EXPECTED_RETRIES)); diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 7c30563709ee..9956684c269b 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -188,7 +188,7 @@ static void checkForErrors(HttpEntity responseEntity, int backendVersion) throws StringBuilder errorMessages = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:"); JsonNode items = searchResult.path("items"); - //some items present in bulk might have errors, concatenate error messages + // some items present in bulk might have errors, concatenate error messages for (JsonNode item : items) { String errorRootName = ""; @@ -572,7 +572,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @Nullable private final Integer numSlices; @Nullable private final Integer sliceId; - //constructor used in split() when we know the backend version + // constructor used in split() when we know the backend version private BoundedElasticsearchSource( Read spec, @Nullable String shardPreference, @@ -727,7 +727,7 @@ public boolean start() throws IOException { if ((source.backendVersion == 5 || source.backendVersion == 6) && source.numSlices != null && source.numSlices > 1) { - //if there is more than one slice, add the slice to the user query + // if there is more than one slice, add the slice to the user query String sliceQuery = String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices); query = query.replaceFirst("\\{", "{" + sliceQuery + ","); @@ -777,7 +777,7 @@ public boolean advance() throws IOException { } private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) { - //stop if no more data + // stop if no more data JsonNode hits = searchResult.path("hits").path("hits"); if (hits.size() == 0) { current = null; @@ -1300,12 +1300,12 @@ private HttpEntity handleRetry( Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = retryBackoff.backoff(); int attempt = 0; - //while retry policy exists + // while retry policy exists while (BackOffUtils.next(sleeper, backoff)) { LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt)); response = restClient.performRequest(method, endpoint, params, requestBody); responseEntity = new BufferedHttpEntity(response.getEntity()); - //if response has no 429 errors + // if response has no 429 errors if (!spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { return responseEntity; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b4dba1b78bb9..54378fcf3636 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1604,8 +1604,7 @@ public WriteResult expand(PCollection input) { checkArgument( 1 == Iterables.size( - allToArgs - .stream() + allToArgs.stream() .filter(Predicates.notNull()::apply) .collect(Collectors.toList())), "Exactly one of jsonTableRef, tableFunction, or " + "dynamicDestinations must be set"); @@ -1615,8 +1614,7 @@ public WriteResult expand(PCollection input) { checkArgument( 2 > Iterables.size( - allSchemaArgs - .stream() + allSchemaArgs.stream() .filter(Predicates.notNull()::apply) .collect(Collectors.toList())), "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + "be set"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 00d5b570b824..a30465e61b9d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -914,7 +914,8 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Inte Transport.getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. + // Do not log 404. It clutters the output and is possibly even required by the + // caller. httpRequestInitializer)) .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index e8fcf210a20b..b6d8b748c16e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -263,9 +263,7 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso .collect(toMap(i -> bqFields.get(i).getName(), i -> i)); List rawJsonValues = - rowSchema - .getFields() - .stream() + rowSchema.getFields().stream() .map(field -> bqFieldIndices.get(field.getName())) .map(index -> jsonBqRow.getF().get(index).getV()) .collect(toList()); @@ -284,9 +282,9 @@ private static Object toBeamValue(FieldType fieldType, Object jsonBQValue) { if (jsonBQValue instanceof List) { return ((List) jsonBQValue) .stream() - .map(v -> ((Map) v).get("v")) - .map(v -> toBeamValue(fieldType.getCollectionElementType(), v)) - .collect(toList()); + .map(v -> ((Map) v).get("v")) + .map(v -> toBeamValue(fieldType.getCollectionElementType(), v)) + .collect(toList()); } throw new UnsupportedOperationException( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java index d96e8b3d0810..ed10c21ccb9d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java @@ -228,8 +228,7 @@ private void pollAndAssert( return Collections.emptyList(); } - return bqRows - .stream() + return bqRows.stream() .map(bqRow -> toBeamRow(rowSchema, bqSchema, bqRow)) .collect(Collectors.toList()); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 633355d8b24d..9277055ace31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -75,7 +75,8 @@ public PubsubClient newClient( Transport.getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. + // Do not log 404. It clutters the output and is possibly even required by the + // caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) .setRootUrl(options.getPubsubRootUrl()) .setApplicationName(options.getAppName()) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index ead5cb555b14..d09745dcdfdd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -672,7 +672,8 @@ public void testInsertOtherRetry() throws Throwable { List> rows = new ArrayList<>(); rows.add(wrapValue(new TableRow())); - // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload but should not + // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload + // but should not // be invoked. when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); when(response.getStatusCode()).thenReturn(403).thenReturn(200); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java index d976366ac4b0..8d6dcff19d04 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java @@ -149,9 +149,7 @@ private void verifyLegacyQueryRes(String outputTable) throws Exception { BQ_CLIENT.queryWithRetries(String.format("SELECT fruit from [%s];", outputTable), project); LOG.info("Finished to query result table {}", outputTable); List tableResult = - response - .getRows() - .stream() + response.getRows().stream() .flatMap(row -> row.getF().stream().map(cell -> cell.getV().toString())) .sorted() .collect(Collectors.toList()); @@ -171,9 +169,7 @@ private void verifyNewTypesQueryRes(String outputTable) throws Exception { String.format("SELECT bytes, date, time FROM [%s];", outputTable), project); LOG.info("Finished to query result table {}", outputTable); List tableResult = - response - .getRows() - .stream() + response.getRows().stream() .map( row -> { String res = ""; @@ -234,13 +230,13 @@ private void verifyStandardQueryRes(String outputTable) throws Exception { @BeforeClass public static void setupTestEnvironment() throws Exception { PipelineOptionsFactory.register(BigQueryToTableOptions.class); - project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); // Create one BQ dataset for all test cases. BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID); // Create table and insert data for new type query test cases. BQ_CLIENT.createNewTable( - project, + project, BIG_QUERY_DATASET_ID, new Table() .setSchema(BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_SCHEMA) @@ -265,7 +261,7 @@ public static void cleanup() { @Test public void testLegacyQueryWithoutReshuffle() throws Exception { final String outputTable = - project + ":" + BIG_QUERY_DATASET_ID + "." + "testLegacyQueryWithoutReshuffle"; + project + ":" + BIG_QUERY_DATASET_ID + "." + "testLegacyQueryWithoutReshuffle"; this.runBigQueryToTablePipeline(setupLegacyQueryTest(outputTable)); @@ -275,7 +271,7 @@ public void testLegacyQueryWithoutReshuffle() throws Exception { @Test public void testNewTypesQueryWithoutReshuffle() throws Exception { final String outputTable = - project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffle"; + project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffle"; this.runBigQueryToTablePipeline(setupNewTypesQueryTest(outputTable)); @@ -285,7 +281,7 @@ public void testNewTypesQueryWithoutReshuffle() throws Exception { @Test public void testNewTypesQueryWithReshuffle() throws Exception { final String outputTable = - project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithReshuffle"; + project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithReshuffle"; BigQueryToTableOptions options = setupNewTypesQueryTest(outputTable); options.setReshuffle(true); @@ -297,7 +293,7 @@ public void testNewTypesQueryWithReshuffle() throws Exception { @Test public void testStandardQueryWithoutCustom() throws Exception { final String outputTable = - project + ":" + BIG_QUERY_DATASET_ID + "." + "testStandardQueryWithoutCustom"; + project + ":" + BIG_QUERY_DATASET_ID + "." + "testStandardQueryWithoutCustom"; this.runBigQueryToTablePipeline(setupStandardQueryTest(outputTable)); @@ -308,7 +304,7 @@ public void testStandardQueryWithoutCustom() throws Exception { @Category(DataflowPortabilityApiUnsupported.class) public void testNewTypesQueryWithoutReshuffleWithCustom() throws Exception { final String outputTable = - project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffleWithCustom"; + project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffleWithCustom"; BigQueryToTableOptions options = this.setupNewTypesQueryTest(outputTable); options.setExperiments( ImmutableList.of("enable_custom_bigquery_sink", "enable_custom_bigquery_source")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 8924733795fc..e995e7af969e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -581,8 +581,7 @@ public void testReadingWithFilter() throws Exception { String regex = ".*17.*"; final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex); Iterable filteredRows = - testRows - .stream() + testRows.stream() .filter( input -> { verifyNotNull(input, "input"); @@ -710,7 +709,7 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception { makeTableData(table, numRows); service.setupSampleRowKeys(table, numSamples, bytesPerRow); - //Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9] + // Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9] List keyRanges = Arrays.asList( ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)), @@ -720,7 +719,7 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception { ByteKeyRange.of(createByteKey(6), createByteKey(7)), ByteKeyRange.of(createByteKey(8), createByteKey(9))); - //Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9] + // Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9] List expectedKeyRangesAfterReducedSplits = Arrays.asList( ByteKeyRange.of(ByteKey.EMPTY, createByteKey(2)), @@ -770,7 +769,7 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception { makeTableData(table, numRows); service.setupSampleRowKeys(table, numSamples, bytesPerRow); - //Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9] + // Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9] List keyRanges = Arrays.asList( ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)), @@ -846,8 +845,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception { splits.add(source.withSingleRange(range)); } - //Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..] - //expected reduced Split source ranges are [..4][4..8][8..] + // Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..] + // expected reduced Split source ranges are [..4][4..8][8..] List expectedKeyRangesAfterReducedSplits = Arrays.asList( ByteKeyRange.of(ByteKey.EMPTY, createByteKey(4)), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index ed885555bd1f..a3ed49deeb77 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -233,7 +233,7 @@ static long countEntities(V1TestOptions options, String project, String ancestor private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; // Number of times to retry on update failure private static final int MAX_RETRIES = 5; - //Initial backoff time for exponential backoff for retry attempts. + // Initial backoff time for exponential backoff for retry attempts. private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(5); // Returns true if a Datastore key is complete. A key is complete if its last element diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 13de3b51355c..6fe51d6a95d7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -354,8 +354,7 @@ public void evaluate() throws Throwable { private void setupTestClient(List inputs, Coder coder) { List messages = - inputs - .stream() + inputs.stream() .map( t -> { try { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index 3b92f140cf0a..5b4a7262a8b3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -530,9 +530,7 @@ public void testGatherBundleAndSortFn() throws Exception { // Verify sorted output... first decode it... List sorted = - byteArrayKvListCaptor - .getValue() - .stream() + byteArrayKvListCaptor.getValue().stream() .map(kv -> WriteGrouped.decode(kv.getValue())) .collect(Collectors.toList()); assertThat( @@ -586,8 +584,7 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception { // decode list of lists of KV to a list of lists of MutationGroup. List> mgListGroups = - kvGroups - .stream() + kvGroups.stream() .map( l -> l.stream() @@ -625,8 +622,7 @@ public void testBatchFn_cells() throws Exception { g(m(2L))); List> encodedInput = - mutationGroups - .stream() + mutationGroups.stream() .map(mg -> KV.of((byte[]) null, WriteGrouped.encode(mg))) .collect(Collectors.toList()); @@ -671,8 +667,7 @@ public void testBatchFn_size() throws Exception { g(m(2L))); List> encodedInput = - mutationGroups - .stream() + mutationGroups.stream() .map(mg -> KV.of((byte[]) null, WriteGrouped.encode(mg))) .collect(Collectors.toList()); diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index a85cfd08b730..0bd556f092f8 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -190,7 +190,8 @@ protected void rename( boolean success = fileSystem.rename(src, dest); // If the failure was due to the file already existing, delete and retry (BEAM-5036). - // This should be the exceptional case, so handle here rather than incur the overhead of testing first + // This should be the exceptional case, so handle here rather than incur the overhead of + // testing first if (!success && fileSystem.exists(src) && fileSystem.exists(dest)) { LOG.debug( String.format(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(dest))); diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java index e5a7bf37751a..5a73ea12416c 100644 --- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java @@ -152,7 +152,7 @@ private boolean tryCreateFile(Configuration conf, Path path) { } catch (FileAlreadyExistsException | org.apache.hadoop.fs.FileAlreadyExistsException e) { return false; } catch (RemoteException e) { - //remote hdfs exception + // remote hdfs exception if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())) { return false; } diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java index 5cce85abd535..1d7fc32c203f 100644 --- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java @@ -641,8 +641,7 @@ public void populateDisplayData(DisplayData.Builder builder) { "Generated {} splits. Size of first split is {} ", inputSplits.size(), inputSplits.get(0).getSplit().getLength()); - return inputSplits - .stream() + return inputSplits.stream() .map( serializableInputSplit -> { return new HadoopInputFormatBoundedSource<>( diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java index 04eadeaaf468..4b88e32ece6b 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java @@ -177,7 +177,7 @@ private static void createCassandraData() { @BeforeClass public static void startCassandra() throws Exception { - //Start the Embedded Cassandra Service + // Start the Embedded Cassandra Service cassandra.start(); final SocketOptions socketOptions = new SocketOptions(); // Setting this to 0 disables read timeouts. diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java index 1a19dd937a5d..509cc46c8980 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java @@ -93,8 +93,7 @@ KV.of(new Text(element.getKey()), new LongWritable(element.getValue())); private static Map computeWordCounts(List sentences) { - return sentences - .stream() + return sentences.stream() .flatMap(s -> Stream.of(s.split("\\W+"))) .map(String::toLowerCase) .collect(Collectors.toMap(Function.identity(), s -> 1L, Long::sum)); @@ -298,8 +297,7 @@ public void streamTest() { } private Map loadWrittenDataAsMap(String outputDirPath) { - return loadWrittenData(outputDirPath) - .stream() + return loadWrittenData(outputDirPath).stream() .collect( Collectors.toMap( kv -> kv.getKey().toString(), diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java index a40ab46e8871..c938b7df7343 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java @@ -24,7 +24,7 @@ /** Properties needed when using HadoopFormatIO with the Beam SDK. */ public interface HadoopFormatIOTestOptions extends TestPipelineOptions { - //Cassandra test options + // Cassandra test options @Description("Cassandra Server IP") @Default.String("cassandraServerIp") String getCassandraServerIp(); @@ -49,7 +49,7 @@ void setCassandraPassword(String cassandraPassword); - //Elasticsearch test options + // Elasticsearch test options @Description("Elasticsearch Server IP") @Default.String("elasticServerIp") String getElasticServerIp(); diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java index 9d62f481add8..98db521b5324 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java @@ -68,11 +68,12 @@ public static List> getEmployeeData() { return (data.isEmpty() ? populateEmployeeData() : data) .stream() - .map( - input -> { - List empData = Splitter.on('_').splitToList(input.getValue()); - return KV.of(new Text(input.getKey()), new Employee(empData.get(0), empData.get(1))); - }) - .collect(Collectors.toList()); + .map( + input -> { + List empData = Splitter.on('_').splitToList(input.getValue()); + return KV.of( + new Text(input.getKey()), new Employee(empData.get(0), empData.get(1))); + }) + .collect(Collectors.toList()); } } diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java index f2a1eae936d4..02732611b7d7 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java @@ -175,7 +175,7 @@ private static void createCassandraData() { @BeforeClass public static void startCassandra() throws Exception { - //Start the Embedded Cassandra Service + // Start the Embedded Cassandra Service cassandra.start(); final SocketOptions socketOptions = new SocketOptions(); // Setting this to 0 disables read timeouts. diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java index 7c571b10df67..ade544fc22bd 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java @@ -24,7 +24,7 @@ /** Properties needed when using HadoopInputFormatIO with the Beam SDK. */ public interface HIFITestOptions extends TestPipelineOptions { - //Cassandra test options + // Cassandra test options @Description("Cassandra Server IP") @Default.String("cassandraServerIp") String getCassandraServerIp(); @@ -49,7 +49,7 @@ void setCassandraPassword(String cassandraPassword); - //Elasticsearch test options + // Elasticsearch test options @Description("Elasticsearch Server IP") @Default.String("elasticServerIp") String getElasticServerIp(); diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java index 65aa5ee12ceb..6f91328f5d13 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java @@ -68,11 +68,12 @@ public static List> getEmployeeData() { return (data.isEmpty() ? populateEmployeeData() : data) .stream() - .map( - input -> { - List empData = Splitter.on('_').splitToList(input.getValue()); - return KV.of(new Text(input.getKey()), new Employee(empData.get(0), empData.get(1))); - }) - .collect(Collectors.toList()); + .map( + input -> { + List empData = Splitter.on('_').splitToList(input.getValue()); + return KV.of( + new Text(input.getKey()), new Employee(empData.get(0), empData.get(1))); + }) + .collect(Collectors.toList()); } } diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java index d2ff9b9a3c43..c742185ca5ab 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java @@ -276,8 +276,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Except desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes); } ReaderContext readerContext = getReaderContext(desiredSplitCount); - //process the splits returned by native API - //this could be different from 'desiredSplitCount' calculated above + // process the splits returned by native API + // this could be different from 'desiredSplitCount' calculated above LOG.info( "Splitting into bundles of {} bytes: " + "estimated size {}, desired split count {}, actual split count {}", @@ -486,7 +486,7 @@ private void flush() throws HCatException { masterWriter.commit(writerContext); } catch (HCatException e) { LOG.error("Exception in flush - write/commit data to Hive", e); - //abort on exception + // abort on exception masterWriter.abort(writerContext); throw e; } finally { diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java index 5121e39b3f07..fb83c0060f49 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.sql.Statement; import org.apache.beam.sdk.io.common.DatabaseTestHelper; + /** Helper for creating connection and test tables on hive database via JDBC driver. */ class HiveDatabaseTestHelper { private static Connection con; With regards, Apache Git Services