Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A32BC200BAE for ; Fri, 28 Oct 2016 16:47:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A1C7A160ADD; Fri, 28 Oct 2016 14:47:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2655B160B03 for ; Fri, 28 Oct 2016 16:47:46 +0200 (CEST) Received: (qmail 47486 invoked by uid 500); 28 Oct 2016 14:47:45 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 47477 invoked by uid 99); 28 Oct 2016 14:47:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 14:47:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E3B9E1A9A5F for ; Fri, 28 Oct 2016 14:47:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id DkUOfrh5Rot3 for ; Fri, 28 Oct 2016 14:47:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 967775FC63 for ; Fri, 28 Oct 2016 14:47:36 +0000 (UTC) Received: (qmail 42675 invoked by uid 99); 28 Oct 2016 14:47:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 14:47:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E1E6F1710; Fri, 28 Oct 2016 14:47:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@beam.incubator.apache.org Date: Fri, 28 Oct 2016 14:47:43 -0000 Message-Id: <70007dd3bccf4eea8fc9fa3c10b9f9d1@git.apache.org> In-Reply-To: <59305eddf73748209ca2ac502511c849@git.apache.org> References: <59305eddf73748209ca2ac502511c849@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] incubator-beam git commit: Remove use of OldDoFn from some DirectRunner tests archived-at: Fri, 28 Oct 2016 14:47:47 -0000 Remove use of OldDoFn from some DirectRunner tests Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d086857 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d086857 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d086857 Branch: refs/heads/apex-runner Commit: 3d086857de87734b087076dad3eca92f625bb417 Parents: 4051357 Author: Kenneth Knowles Authored: Mon Oct 24 16:09:13 2016 -0700 Committer: Kenneth Knowles Committed: Tue Oct 25 13:12:17 2016 -0700 ---------------------------------------------------------------------- .../ConsumerTrackingPipelineVisitorTest.java | 32 +++---- .../beam/runners/direct/DirectRunnerTest.java | 40 +++++---- .../ImmutabilityCheckingBundleFactoryTest.java | 8 +- .../ImmutabilityEnforcementFactoryTest.java | 8 +- .../direct/KeyedPValueTrackingVisitorTest.java | 8 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 8 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 87 +++++++++--------- .../direct/ParDoSingleEvaluatorFactoryTest.java | 94 +++++++++----------- .../runners/direct/WatermarkManagerTest.java | 8 +- 9 files changed, 139 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java index 1c9b5a6..e8f2a7e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java @@ -26,8 +26,8 @@ import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -60,9 +60,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { p.apply("listCreate", Create.of("foo", "bar")) .apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -107,9 +107,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection transformed = created.apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -138,9 +138,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection transformed = created.apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -155,9 +155,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { p.apply(Create.of("1", "2", "3")) .apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -180,9 +180,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection transformed = created.apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(DoFn.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 4027d25..34a5469 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -221,8 +220,8 @@ public class DirectRunnerTest implements Serializable { @Test public void transformDisplayDataExceptionShouldFail() { - OldDoFn brokenDoFn = new OldDoFn() { - @Override + DoFn brokenDoFn = new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception {} @Override @@ -242,7 +241,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -251,8 +250,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { List outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); outputList.set(0, 37); @@ -267,7 +267,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -276,8 +276,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { List outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); outputList.set(0, 37); @@ -291,7 +292,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails + * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -300,8 +301,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new OldDoFn() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; c.output(outputArray); outputArray[0] = 0xa; @@ -316,7 +318,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the + * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -326,8 +328,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) .withCoder(ListCoder.of(VarIntCoder.of()))) - .apply(ParDo.of(new OldDoFn, Integer>() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { List inputList = c.element(); inputList.set(0, 37); c.output(12); @@ -341,7 +344,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails + * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -350,8 +353,9 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) - .apply(ParDo.of(new OldDoFn() { - @Override public void processElement(ProcessContext c) { + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { byte[] inputArray = c.element(); inputArray[0] = 0xa; c.output(13); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index d445944..ea44125 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -179,9 +179,9 @@ public class ImmutabilityCheckingBundleFactoryTest { intermediate.commit(Instant.now()); } - private static class IdentityDoFn extends OldDoFn { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + private static class IdentityDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 812d7d5..a7277fe 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; @@ -57,9 +57,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) .apply( ParDo.of( - new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.element()[0] = 'b'; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index ee6b2b4..cf65936 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -31,9 +31,9 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -177,9 +177,9 @@ public class KeyedPValueTrackingVisitorTest { } } - private static class IdentityFn extends OldDoFn { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + private static class IdentityFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 1a742f0..6d00aa1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -37,7 +37,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -168,7 +168,7 @@ public class ParDoEvaluatorTest { ImmutableMap., PCollection>of(mainOutputTag, output)); } - private static class RecorderFn extends OldDoFn { + private static class RecorderFn extends DoFn { private Collection processed; private final PCollectionView view; @@ -177,8 +177,8 @@ public class ParDoEvaluatorTest { this.view = view; } - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { processed.add(c.element()); c.output(c.element() + c.sideInput(view)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index 8b0070b..cc83323 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -41,11 +41,16 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.WatermarkHoldState; @@ -81,8 +86,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override + new DoFn>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.element().length())); c.sideOutput(elementTag, c.element()); @@ -170,8 +175,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override + new DoFn>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.element().length())); c.sideOutput(elementTag, c.element()); @@ -258,20 +263,17 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(20202L + c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window( - GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); + new DoFn>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState bagState) { + bagState.add(c.element()); } }) .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); @@ -362,34 +364,25 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + new DoFn>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); } }) .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index e562b28..d22643a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -32,22 +32,25 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -74,8 +77,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection collection = input.apply( ParDo.of( - new OldDoFn() { - @Override + new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().length()); } @@ -128,8 +131,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection collection = input.apply( ParDo.of( - new OldDoFn() { - @Override + new DoFn() { + @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element().length()); } @@ -178,26 +181,22 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - final StateTag> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp()); final StateTag> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); final StateNamespace windowNs = StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); ParDo.Bound> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(124443L - c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); + new DoFn>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState bagState) { + bagState.add(c.element()); } }); PCollection> mainOutput = input.apply(pardo); @@ -237,9 +236,6 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); assertThat(result.getState(), not(nullValue())); assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(124438L))); - assertThat( result.getState().state(windowNs, bagTag).read(), containsInAnyOrder("foo", "bara", "bazam")); } @@ -255,6 +251,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection input = p.apply(Create.of("foo", "bara", "bazam")); + // TODO: this timer data is absolute, but the new API only support relative settings. + // It will require adjustments when @Ignore is removed final TimerData addedTimer = TimerData.of( StateNamespaces.window( @@ -276,34 +274,24 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { ParDo.Bound> pardo = ParDo.of( - new OldDoFn>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + new DoFn>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); } }); PCollection> mainOutput = input.apply(pardo); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d086857/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 042abab..1954005 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -47,9 +47,9 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -101,9 +101,9 @@ public class WatermarkManagerTest implements Serializable { createdInts = p.apply("createdInts", Create.of(1, 2, 3)); filtered = createdInts.apply("filtered", Filter.greaterThan(1)); - filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn() { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { + filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { c.output(c.element() * 2); } }));