Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5CA2D200C3F for ; Wed, 22 Mar 2017 22:04:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5B315160B86; Wed, 22 Mar 2017 21:04:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 02054160B74 for ; Wed, 22 Mar 2017 22:04:30 +0100 (CET) Received: (qmail 91987 invoked by uid 500); 22 Mar 2017 21:04:30 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 91974 invoked by uid 99); 22 Mar 2017 21:04:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Mar 2017 21:04:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 10D61DFE1D; Wed, 22 Mar 2017 21:04:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.apache.org Date: Wed, 22 Mar 2017 21:04:30 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Deprecate GetAllowedTimestampSkew archived-at: Wed, 22 Mar 2017 21:04:32 -0000 Repository: beam Updated Branches: refs/heads/master c31b63340 -> 7c7bb8209 Deprecate GetAllowedTimestampSkew AllowedTimestampSkew is unsafe, as it allows elements to be produced before the watermark, which causes them to be late. BEAM-644 tracks replacements for this method. Handle infinite skew in SimpleDoFnRunner Update tests to ensure that "unlimited skew" is respected Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f970fa3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f970fa3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f970fa3 Branch: refs/heads/master Commit: 9f970fa3f1b447c292445ee1c230f120ce6e97b1 Parents: c31b633 Author: Thomas Groh Authored: Tue Mar 14 12:46:49 2017 -0700 Committer: Thomas Groh Committed: Wed Mar 22 14:03:26 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 6 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 146 +++++++++++++++++++ .../org/apache/beam/sdk/transforms/DoFn.java | 19 ++- .../beam/sdk/transforms/WithTimestamps.java | 26 +++- .../apache/beam/sdk/transforms/ParDoTest.java | 112 ++++++++++---- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 1 + 6 files changed, 272 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index f5a559c..dfa9645 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -576,8 +576,12 @@ public class SimpleDoFnRunner implements DoFnRunner runner = + new SimpleDoFnRunner<>( + null, + fn, + NullSideInputReader.empty(), + new ListOutputManager(), + new TupleTag(), + Collections.>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + runner.startBundle(); + // An element output at the current timestamp is fine. + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0))); + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalArgumentException.class)); + thrown.expectMessage("must be no earlier"); + thrown.expectMessage( + String.format("timestamp of the current input (%s)", new Instant(0).toString())); + thrown.expectMessage( + String.format( + "the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod()))); + // An element output before (current time - skew) is forbidden + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0))); + } + + /** + * Demonstrates that attempting to output an element before the timestamp of the current element + * plus the value of {@link DoFn#getAllowedTimestampSkew()} throws, but between that value and + * the current timestamp succeeds. + */ + @Test + public void testSkew() { + SkewingDoFn fn = new SkewingDoFn(Duration.standardMinutes(10L)); + DoFnRunner runner = + new SimpleDoFnRunner<>( + null, + fn, + NullSideInputReader.empty(), + new ListOutputManager(), + new TupleTag(), + Collections.>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + runner.startBundle(); + // Outputting between "now" and "now - allowed skew" succeeds. + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow(Duration.standardMinutes(5L), new Instant(0))); + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalArgumentException.class)); + thrown.expectMessage("must be no earlier"); + thrown.expectMessage( + String.format("timestamp of the current input (%s)", new Instant(0).toString())); + thrown.expectMessage( + String.format( + "the allowed skew (%s)", + PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod()))); + // Outputting before "now - allowed skew" fails. + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow(Duration.standardHours(1L), new Instant(0))); + } + + /** + * Demonstrates that attempting to output an element with a timestamp before the current one + * always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to + * {@link Long#MAX_VALUE} milliseconds. + */ + @Test + public void testInfiniteSkew() { + SkewingDoFn fn = new SkewingDoFn(Duration.millis(Long.MAX_VALUE)); + DoFnRunner runner = + new SimpleDoFnRunner<>( + null, + fn, + NullSideInputReader.empty(), + new ListOutputManager(), + new TupleTag(), + Collections.>emptyList(), + mockStepContext, + null, + WindowingStrategy.of(new GlobalWindows())); + + runner.startBundle(); + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0))); + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow( + Duration.millis(1L), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))); + runner.processElement( + WindowedValue.timestampedValueInGlobalWindow( + // This is the maximum amount a timestamp in beam can move (from the maximum timestamp + // to the minimum timestamp). + Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) + .minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())), + BoundedWindow.TIMESTAMP_MAX_VALUE)); + } + static class ThrowingDoFn extends DoFn { final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception"); @@ -296,4 +411,35 @@ public class SimpleDoFnRunnerTest { context.timeDomain())); } } + + + /** + * A {@link DoFn} that outputs elements with timestamp equal to the input timestamp minus the + * input element. + */ + private static class SkewingDoFn extends DoFn { + private final Duration allowedSkew; + + private SkewingDoFn(Duration allowedSkew) { + this.allowedSkew = allowedSkew; + } + + @ProcessElement + public void processElement(ProcessContext context) { + context.outputWithTimestamp(context.element(), context.timestamp().minus(context.element())); + } + + @Override + public Duration getAllowedTimestampSkew() { + return allowedSkew; + } + } + + private static class ListOutputManager implements OutputManager { + private ListMultimap, WindowedValue> outputs = ArrayListMultimap.create(); + @Override + public void output(TupleTag tag, WindowedValue output) { + outputs.put(tag, output); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 6f88738..6c5abbc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; @@ -317,14 +318,20 @@ public abstract class DoFn implements Serializable, HasDisplayD } /** - * Returns the allowed timestamp skew duration, which is the maximum - * duration that timestamps can be shifted backward in - * {@link DoFn.Context#outputWithTimestamp}. + * Returns the allowed timestamp skew duration, which is the maximum duration that timestamps can + * be shifted backward in {@link DoFn.Context#outputWithTimestamp}. + * + *

The default value is {@code Duration.ZERO}, in which case timestamps can only be shifted + * forward to future. For infinite skew, return {@code Duration.millis(Long.MAX_VALUE)}. + * + * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These + * elements are considered late, and if behind the + * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link PCollection} may be silently dropped. See + * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. * - *

The default value is {@code Duration.ZERO}, in which case - * timestamps can only be shifted forward to future. For infinite - * skew, return {@code Duration.millis(Long.MAX_VALUE)}. */ + @Deprecated public Duration getAllowedTimestampSkew() { return Duration.ZERO; } http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index 387707b..6f20226 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -45,10 +45,16 @@ public class WithTimestamps extends PTransform, PCollection * each element is output with a timestamp obtained as the result of {@code fn.apply(v)}. * *

If the input {@link PCollection} elements have timestamps, the output timestamp for each - * element must not be before the input element's timestamp minus the value of - * {@link #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform - * will throw an {@link IllegalArgumentException} when executed. Use - * {@link #withAllowedTimestampSkew(Duration)} to update the allowed skew. + * element must not be before the input element's timestamp minus the value of {@link + * #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will + * throw an {@link IllegalArgumentException} when executed. Use {@link + * #withAllowedTimestampSkew(Duration)} to update the allowed skew. + * + *

CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted + * behind the watermark. These elements are considered late, and if behind the {@link + * Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may + * be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a + * replacement. * *

Each output element will be in the same windows as the input element. If a new window based * on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}. @@ -82,7 +88,13 @@ public class WithTimestamps extends PTransform, PCollection * *

The default value is {@code Duration.ZERO}, allowing timestamps to only be shifted into the * future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}. + * @deprecated This method permits a to elements to be emitted behind the watermark. These + * elements are considered late, and if behind the + * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link PCollection} may be silently dropped. See + * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. */ + @Deprecated public WithTimestamps withAllowedTimestampSkew(Duration allowedTimestampSkew) { return new WithTimestamps<>(this.fn, allowedTimestampSkew); } @@ -92,7 +104,13 @@ public class WithTimestamps extends PTransform, PCollection * duration that timestamps can be shifted backwards from the timestamp of the input element. * * @see DoFn#getAllowedTimestampSkew() + * @deprecated This method permits a to elements to be emitted behind the watermark. These + * elements are considered late, and if behind the + * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link PCollection} may be silently dropped. See + * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. */ + @Deprecated public Duration getAllowedTimestampSkew() { return allowedTimestampSkew; } http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index d5786f1..f7bf17a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; @@ -250,15 +251,15 @@ public class ParDoTest implements Serializable { } } - static class TestOutputTimestampDoFn extends DoFn { + static class TestOutputTimestampDoFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { - Integer value = c.element(); + T value = c.element(); c.outputWithTimestamp(value, new Instant(value.longValue())); } } - static class TestShiftTimestampDoFn extends DoFn { + static class TestShiftTimestampDoFn extends DoFn { private Duration allowedTimestampSkew; private Duration durationToShift; @@ -276,12 +277,12 @@ public class ParDoTest implements Serializable { public void processElement(ProcessContext c) { Instant timestamp = c.timestamp(); checkNotNull(timestamp); - Integer value = c.element(); + T value = c.element(); c.outputWithTimestamp(value, timestamp.plus(durationToShift)); } } - static class TestFormatTimestampDoFn extends DoFn { + static class TestFormatTimestampDoFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { checkNotNull(c.timestamp()); @@ -1238,9 +1239,9 @@ public class ParDoTest implements Serializable { PCollection output = input - .apply(ParDo.of(new TestOutputTimestampDoFn())) - .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))) - .apply(ParDo.of(new TestFormatTimestampDoFn())); + .apply(ParDo.of(new TestOutputTimestampDoFn())) + .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))) + .apply(ParDo.of(new TestFormatTimestampDoFn())); PAssert.that(output).containsInAnyOrder( "processing: 3, timestamp: 3", @@ -1270,8 +1271,8 @@ public class ParDoTest implements Serializable { sideOutputTag, c.element(), new Instant(c.element().longValue())); } })).get(sideOutputTag) - .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))) - .apply(ParDo.of(new TestFormatTimestampDoFn())); + .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO))) + .apply(ParDo.of(new TestFormatTimestampDoFn())); PAssert.that(output).containsInAnyOrder( "processing: 3, timestamp: 3", @@ -1282,7 +1283,7 @@ public class ParDoTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category(RunnableOnService.class) public void testParDoShiftTimestamp() { PCollection input = @@ -1290,10 +1291,12 @@ public class ParDoTest implements Serializable { PCollection output = input - .apply(ParDo.of(new TestOutputTimestampDoFn())) - .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000), - Duration.millis(-1000)))) - .apply(ParDo.of(new TestFormatTimestampDoFn())); + .apply(ParDo.of(new TestOutputTimestampDoFn())) + .apply( + ParDo.of( + new TestShiftTimestampDoFn( + Duration.millis(1000), Duration.millis(-1000)))) + .apply(ParDo.of(new TestFormatTimestampDoFn())); PAssert.that(output).containsInAnyOrder( "processing: 3, timestamp: -997", @@ -1304,14 +1307,18 @@ public class ParDoTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category(RunnableOnService.class) public void testParDoShiftTimestampInvalid() { - pipeline.apply(Create.of(Arrays.asList(3, 42, 6))) - .apply(ParDo.of(new TestOutputTimestampDoFn())) - .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000), // allowed skew = 1 second - Duration.millis(-1001)))) - .apply(ParDo.of(new TestFormatTimestampDoFn())); + pipeline + .apply(Create.of(Arrays.asList(3, 42, 6))) + .apply(ParDo.of(new TestOutputTimestampDoFn())) + .apply( + ParDo.of( + new TestShiftTimestampDoFn( + Duration.millis(1000), // allowed skew = 1 second + Duration.millis(-1001)))) + .apply(ParDo.of(new TestFormatTimestampDoFn())); thrown.expect(RuntimeException.class); thrown.expectMessage("Cannot output with timestamp"); @@ -1324,12 +1331,11 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoShiftTimestampInvalidZeroAllowed() { - - pipeline.apply(Create.of(Arrays.asList(3, 42, 6))) - .apply(ParDo.of(new TestOutputTimestampDoFn())) - .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, - Duration.millis(-1001)))) - .apply(ParDo.of(new TestFormatTimestampDoFn())); + pipeline + .apply(Create.of(Arrays.asList(3, 42, 6))) + .apply(ParDo.of(new TestOutputTimestampDoFn())) + .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.millis(-1001)))) + .apply(ParDo.of(new TestFormatTimestampDoFn())); thrown.expect(RuntimeException.class); thrown.expectMessage("Cannot output with timestamp"); @@ -1339,6 +1345,58 @@ public class ParDoTest implements Serializable { pipeline.run(); } + @Test + @Category(RunnableOnService.class) + public void testParDoShiftTimestampUnlimited() { + PCollection outputs = + pipeline + .apply( + Create.of( + Arrays.asList( + 0L, + BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(), + GlobalWindow.INSTANCE.maxTimestamp().getMillis()))) + .apply("AssignTimestampToValue", ParDo.of(new TestOutputTimestampDoFn())) + .apply("ReassignToMinimumTimestamp", + ParDo.of( + new DoFn() { + @ProcessElement + public void reassignTimestamps(ProcessContext context) { + // Shift the latest element as far backwards in time as the model permits + context.outputWithTimestamp( + context.element(), BoundedWindow.TIMESTAMP_MIN_VALUE); + } + + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + })); + + PAssert.that(outputs) + .satisfies( + new SerializableFunction, Void>() { + @Override + public Void apply(Iterable input) { + // This element is not shifted backwards in time. It must be present in the output. + assertThat(input, hasItem(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())); + for (Long elem : input) { + // Sanity check the outputs. 0L and the end of the global window are shifted + // backwards in time and theoretically could be dropped. + assertThat( + elem, + anyOf( + equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()), + equalTo(GlobalWindow.INSTANCE.maxTimestamp().getMillis()), + equalTo(0L))); + } + return null; + } + }); + + pipeline.run(); + } + private static class Checker implements SerializableFunction, Void> { @Override public Void apply(Iterable input) { http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index b356dad..d924c14 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -174,6 +174,7 @@ public class MongoDbGridFSIO { .setParser(TEXT_PARSER) .setCoder(StringUtf8Coder.of()) .setConnectionConfiguration(ConnectionConfiguration.create()) + .setSkew(Duration.ZERO) .build(); }