beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Change PAssert's dummy inputs from (Void) null to integer 0
Date Fri, 13 May 2016 01:51:43 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6721bd584 -> 93a5d390b


Change PAssert's dummy inputs from (Void) null to integer 0


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a5503db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a5503db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a5503db

Branch: refs/heads/master
Commit: 9a5503db954eccfe0215ee473417bfafb495b61e
Parents: 6721bd5
Author: Kenneth Knowles <klk@google.com>
Authored: Fri May 6 11:19:33 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 12 17:45:43 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a5503db/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 1265acd..c2cd598 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -593,7 +593,7 @@ public class PAssert {
       final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
 
       input
-          .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of()))
+          .apply(Create.of(0).withCoder(VarIntCoder.of()))
           .apply(ParDo.named("RunChecks").withSideInputs(actual)
               .of(new CheckerDoFn<>(checkerFn, actual)));
 
@@ -604,8 +604,11 @@ public class PAssert {
   /**
    * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
+   *
+   * <p>The input is ignored, but is {@link Integer} to be usable on runners that do
not support
+   * null values.
    */
-  private static class CheckerDoFn<ActualT> extends DoFn<Void, Void> {
+  private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -669,14 +672,17 @@ public class PAssert {
       final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
 
       input
-          .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo.named("RunChecks").withSideInputs(actual, expected)
+          .apply(Create.of(0).withCoder(VarIntCoder.of()))
+          .apply("RunChecks", ParDo.withSideInputs(actual, expected)
               .of(new CheckerDoFn<>(relation, actual, expected)));
 
       return PDone.in(input.getPipeline());
     }
 
-    private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Void, Void>
{
+    /**
+     * Input is ignored, but is {@link Integer} for runners that do not support null values.
+     */
+    private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void>
{
       private final Aggregator<Integer, Integer> success =
           createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
       private final Aggregator<Integer, Integer> failure =


Mime
View raw message