beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Port ParDoTest from OldDoFn to new DoFn
Date Wed, 07 Dec 2016 17:00:34 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 1526184ae -> ae52ec1bc


Port ParDoTest from OldDoFn to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8e1e46e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8e1e46e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8e1e46e7

Branch: refs/heads/master
Commit: 8e1e46e73edf9cce376ed7bd194db00edc3e60b4
Parents: 1526184
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Dec 6 21:01:37 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Dec 7 09:00:17 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 238 +++++++------------
 1 file changed, 91 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e1e46e7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 593f304..9755076 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -111,74 +111,9 @@ public class ParDoTest implements Serializable {
           + ":" + window.maxTimestamp().getMillis());
     }
   }
-
-  static class TestOldDoFn extends OldDoFn<Integer, String> {
-    enum State { UNSTARTED, STARTED, PROCESSING, FINISHED }
-    State state = State.UNSTARTED;
-
-    final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
-    final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
-
-    public TestOldDoFn() {
-    }
-
-    public TestOldDoFn(List<PCollectionView<Integer>> sideInputViews,
-                    List<TupleTag<String>> sideOutputTupleTags) {
-      this.sideInputViews.addAll(sideInputViews);
-      this.sideOutputTupleTags.addAll(sideOutputTupleTags);
-    }
-
-    @Override
-    public void startBundle(Context c) {
-      // The Fn can be reused, but only if FinishBundle has been called.
-      assertThat(state, anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
-      state = State.STARTED;
-      outputToAll(c, "started");
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      assertThat(state,
-                 anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
-      state = State.PROCESSING;
-      outputToAllWithSideInputs(c, "processing: " + c.element());
-    }
-
-    @Override
-    public void finishBundle(Context c) {
-      assertThat(state,
-                 anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
-      state = State.FINISHED;
-      outputToAll(c, "finished");
-    }
-
-    private void outputToAll(Context c, String value) {
-      c.output(value);
-      for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
-        c.sideOutput(sideOutputTupleTag,
-                     sideOutputTupleTag.getId() + ": " + value);
-      }
-    }
-
-    private void outputToAllWithSideInputs(ProcessContext c, String value) {
-      if (!sideInputViews.isEmpty()) {
-        List<Integer> sideInputValues = new ArrayList<>();
-        for (PCollectionView<Integer> sideInputView : sideInputViews) {
-          sideInputValues.add(c.sideInput(sideInputView));
-        }
-        value += ": " + sideInputValues;
-      }
-      c.output(value);
-      for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
-        c.sideOutput(sideOutputTupleTag,
-                     sideOutputTupleTag.getId() + ": " + value);
-      }
-    }
-  }
-
-  static class TestNoOutputDoFn extends OldDoFn<Integer, String> {
-    @Override
-    public void processElement(OldDoFn<Integer, String>.ProcessContext c) throws Exception
{}
+  static class TestNoOutputDoFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public void processElement(DoFn<Integer, String>.ProcessContext c) throws Exception
{}
   }
 
   static class TestDoFn extends DoFn<Integer, String> {
@@ -254,52 +189,52 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestStartBatchErrorDoFn extends OldDoFn<Integer, String> {
-    @Override
+  static class TestStartBatchErrorDoFn extends DoFn<Integer, String> {
+    @StartBundle
     public void startBundle(Context c) {
       throw new RuntimeException("test error in initialize");
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       // This has to be here.
     }
   }
 
-  static class TestProcessElementErrorDoFn extends OldDoFn<Integer, String> {
-    @Override
+  static class TestProcessElementErrorDoFn extends DoFn<Integer, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       throw new RuntimeException("test error in process");
     }
   }
 
-  static class TestFinishBatchErrorDoFn extends OldDoFn<Integer, String> {
-    @Override
+  static class TestFinishBatchErrorDoFn extends DoFn<Integer, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       // This has to be here.
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) {
       throw new RuntimeException("test error in finalize");
     }
   }
 
-  private static class StrangelyNamedDoer extends OldDoFn<Integer, String> {
-    @Override
+  private static class StrangelyNamedDoer extends DoFn<Integer, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
     }
   }
 
-  static class TestOutputTimestampDoFn extends OldDoFn<Integer, Integer> {
-    @Override
+  static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Integer value = c.element();
       c.outputWithTimestamp(value, new Instant(value.longValue()));
     }
   }
 
-  static class TestShiftTimestampDoFn extends OldDoFn<Integer, Integer> {
+  static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> {
     private Duration allowedTimestampSkew;
     private Duration durationToShift;
 
@@ -313,7 +248,7 @@ public class ParDoTest implements Serializable {
     public Duration getAllowedTimestampSkew() {
       return allowedTimestampSkew;
     }
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Instant timestamp = c.timestamp();
       checkNotNull(timestamp);
@@ -322,8 +257,8 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestFormatTimestampDoFn extends OldDoFn<Integer, String> {
-    @Override
+  static class TestFormatTimestampDoFn extends DoFn<Integer, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       checkNotNull(c.timestamp());
       c.output("processing: " + c.element() + ", timestamp: " + c.timestamp().getMillis());
@@ -343,14 +278,14 @@ public class ParDoTest implements Serializable {
       return PCollectionTuple.of(BY2, by2).and(BY3, by3);
     }
 
-    static class FilterFn extends OldDoFn<Integer, Integer> {
+    static class FilterFn extends DoFn<Integer, Integer> {
       private final int divisor;
 
       FilterFn(int divisor) {
         this.divisor = divisor;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         if (c.element() % divisor == 0) {
           c.output(c.element());
@@ -368,7 +303,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.of(new TestOldDoFn()));
+        .apply(ParDo.of(new TestDoFn()));
 
     PAssert.that(output)
         .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -402,7 +337,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs).withCoder(VarIntCoder.of()))
-        .apply("TestOldDoFn", ParDo.of(new TestOldDoFn()));
+        .apply("TestDoFn", ParDo.of(new TestDoFn()));
 
     PAssert.that(output)
         .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -420,7 +355,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs).withCoder(VarIntCoder.of()))
-        .apply("TestOldDoFn", ParDo.of(new TestNoOutputDoFn()));
+        .apply("TestDoFn", ParDo.of(new TestNoOutputDoFn()));
 
     PAssert.that(output).empty();
 
@@ -443,7 +378,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo
-               .of(new TestOldDoFn(
+               .of(new TestDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
                .withOutputTags(
@@ -486,7 +421,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo
-               .of(new TestOldDoFn(
+               .of(new TestDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
                .withOutputTags(
@@ -552,8 +487,8 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new OldDoFn<Integer, Void>(){
-                @Override
+            .of(new DoFn<Integer, Void>(){
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.sideOutput(sideOutputTag, c.element());
                 }}));
@@ -575,7 +510,7 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.of(new TestOldDoFn(
+        .apply(ParDo.of(new TestDoFn(
             Arrays.<PCollectionView<Integer>>asList(),
             Arrays.asList(sideTag))));
 
@@ -594,8 +529,8 @@ public class ParDoTest implements Serializable {
 
     // Success for a total of 1000 outputs.
     input
-        .apply("Success1000", ParDo.of(new OldDoFn<Integer, String>() {
-            @Override
+        .apply("Success1000", ParDo.of(new DoFn<Integer, String>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               TupleTag<String> specialSideTag = new TupleTag<String>(){};
               c.sideOutput(specialSideTag, "side");
@@ -610,8 +545,8 @@ public class ParDoTest implements Serializable {
 
     // Failure for a total of 1001 outputs.
     input
-        .apply("Failure1001", ParDo.of(new OldDoFn<Integer, String>() {
-            @Override
+        .apply("Failure1001", ParDo.of(new DoFn<Integer, String>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               for (int i = 0; i < 1000; i++) {
                 c.sideOutput(new TupleTag<String>(){}, "side");
@@ -643,7 +578,7 @@ public class ParDoTest implements Serializable {
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
         .apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2)
-            .of(new TestOldDoFn(
+            .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -677,7 +612,7 @@ public class ParDoTest implements Serializable {
         .apply(ParDo.withSideInputs(sideInput1)
             .withSideInputs(sideInputUnread)
             .withSideInputs(sideInput2)
-            .of(new TestOldDoFn(
+            .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -715,7 +650,7 @@ public class ParDoTest implements Serializable {
             .withSideInputs(sideInputUnread)
             .withSideInputs(sideInput2)
             .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new TestOldDoFn(
+            .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -753,7 +688,7 @@ public class ParDoTest implements Serializable {
             .withSideInputs(sideInputUnread)
             .withSideInputs(sideInput2)
             .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-            .of(new TestOldDoFn(
+            .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
@@ -777,7 +712,7 @@ public class ParDoTest implements Serializable {
         .apply(View.<Integer>asSingleton());
 
     pipeline.apply("CreateMain", Create.of(inputs))
-        .apply(ParDo.of(new TestOldDoFn(
+        .apply(ParDo.of(new TestDoFn(
             Arrays.<PCollectionView<Integer>>asList(sideView),
             Arrays.<TupleTag<String>>asList())));
 
@@ -834,7 +769,7 @@ public class ParDoTest implements Serializable {
   @Test
   public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() {
     Pipeline p = TestPipeline.create();
-    PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new TestOldDoFn()));
+    PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new TestDoFn()));
     assertThat(output.getName(), containsString("ParDo(Test)"));
   }
 
@@ -842,7 +777,7 @@ public class ParDoTest implements Serializable {
   public void testParDoOutputNameBasedOnLabel() {
     Pipeline p = TestPipeline.create();
     PCollection<String> output =
-        p.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestOldDoFn()));
+        p.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestDoFn()));
     assertThat(output.getName(), containsString("MyParDo"));
   }
 
@@ -878,7 +813,7 @@ public class ParDoTest implements Serializable {
     PCollectionTuple outputs = p
         .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
         .apply("MyParDo", ParDo
-               .of(new TestOldDoFn(
+               .of(new TestDoFn(
                    Arrays.<PCollectionView<Integer>>asList(),
                    Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
                .withOutputTags(
@@ -906,7 +841,7 @@ public class ParDoTest implements Serializable {
         .apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>()
{
             @Override
             public PCollection<String> apply(PCollection<Integer> input) {
-              return input.apply(ParDo.of(new TestOldDoFn()));
+              return input.apply(ParDo.of(new TestDoFn()));
             }
           });
 
@@ -943,8 +878,8 @@ public class ParDoTest implements Serializable {
   @Test
   public void testJsonEscaping() {
     // Declare an arbitrary function and make sure we can serialize it
-    OldDoFn<Integer, Integer> doFn = new OldDoFn<Integer, Integer>() {
-      @Override
+    DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(c.element() + 1);
       }
@@ -996,24 +931,25 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  private static class SideOutputDummyFn extends OldDoFn<Integer, Integer> {
+  private static class SideOutputDummyFn extends DoFn<Integer, Integer> {
     private TupleTag<TestDummy> sideTag;
     public SideOutputDummyFn(TupleTag<TestDummy> sideTag) {
       this.sideTag = sideTag;
     }
-    @Override
+
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(1);
       c.sideOutput(sideTag, new TestDummy());
      }
   }
 
-  private static class MainOutputDummyFn extends OldDoFn<Integer, TestDummy> {
+  private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
     private TupleTag<Integer> sideTag;
     public MainOutputDummyFn(TupleTag<Integer> sideTag) {
       this.sideTag = sideTag;
     }
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new TestDummy());
       c.sideOutput(sideTag, 1);
@@ -1190,8 +1126,9 @@ public class ParDoTest implements Serializable {
         .apply(ParDo
             .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
             .of(
-                new OldDoFn<TestDummy, TestDummy>() {
-                  @Override public void processElement(ProcessContext context) {
+                new DoFn<TestDummy, TestDummy>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context) {
                     TestDummy element = context.element();
                     context.output(element);
                     context.sideOutput(sideOutputTag, element);
@@ -1204,8 +1141,9 @@ public class ParDoTest implements Serializable {
     // on a missing coder.
     tuple.get(mainOutputTag)
         .setCoder(TestDummyCoder.of())
-        .apply("Output1", ParDo.of(new OldDoFn<TestDummy, Integer>() {
-          @Override public void processElement(ProcessContext context) {
+        .apply("Output1", ParDo.of(new DoFn<TestDummy, Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext context) {
             context.output(1);
           }
         }));
@@ -1251,8 +1189,8 @@ public class ParDoTest implements Serializable {
     PCollection<String> output =
         input
         .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)).of(
-            new OldDoFn<Integer, Integer>() {
-              @Override
+            new DoFn<Integer, Integer>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.sideOutputWithTimestamp(
                     sideOutputTag, c.element(), new Instant(c.element().longValue()));
@@ -1369,29 +1307,33 @@ public class ParDoTest implements Serializable {
   public void testWindowingInStartAndFinishBundle() {
     Pipeline pipeline = TestPipeline.create();
 
-    PCollection<String> output = pipeline
-        .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
-        .apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
-        .apply(ParDo.of(new OldDoFn<String, String>() {
-                  @Override
-                  public void startBundle(Context c) {
-                    c.outputWithTimestamp("start", new Instant(2));
-                    System.out.println("Start: 2");
-                  }
-
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.output(c.element());
-                    System.out.println("Process: " + c.element() + ":" + c.timestamp().getMillis());
-                  }
-
-                  @Override
-                  public void finishBundle(Context c) {
-                    c.outputWithTimestamp("finish", new Instant(3));
-                    System.out.println("Finish: 3");
-                  }
-                }))
-        .apply(ParDo.of(new PrintingDoFn()));
+    PCollection<String> output =
+        pipeline
+            .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
+            .apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
+            .apply(
+                ParDo.of(
+                    new DoFn<String, String>() {
+                      @StartBundle
+                      public void startBundle(Context c) {
+                        c.outputWithTimestamp("start", new Instant(2));
+                        System.out.println("Start: 2");
+                      }
+
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        c.output(c.element());
+                        System.out.println(
+                            "Process: " + c.element() + ":" + c.timestamp().getMillis());
+                      }
+
+                      @FinishBundle
+                      public void finishBundle(Context c) {
+                        c.outputWithTimestamp("finish", new Instant(3));
+                        System.out.println("Finish: 3");
+                      }
+                    }))
+            .apply(ParDo.of(new PrintingDoFn()));
 
     PAssert.that(output).satisfies(new Checker());
 
@@ -1406,13 +1348,15 @@ public class ParDoTest implements Serializable {
     pipeline
         .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
         .apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
-        .apply(ParDo.of(new OldDoFn<String, String>() {
-                  @Override
+        .apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @StartBundle
                   public void startBundle(Context c) {
                     c.output("start");
                   }
 
-                  @Override
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.element());
                   }
@@ -1423,8 +1367,8 @@ public class ParDoTest implements Serializable {
   }
   @Test
   public void testDoFnDisplayData() {
-    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
-      @Override
+    DoFn<String, String> fn = new DoFn<String, String>() {
+      @ProcessElement
       public void processElement(ProcessContext c) {
       }
 


Mime
View raw message