Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BECDA200C3A for ; Fri, 31 Mar 2017 19:53:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BD8A7160B80; Fri, 31 Mar 2017 17:53:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 847D0160B7C for ; Fri, 31 Mar 2017 19:53:40 +0200 (CEST) Received: (qmail 28406 invoked by uid 500); 31 Mar 2017 17:53:39 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 28394 invoked by uid 99); 31 Mar 2017 17:53:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Mar 2017 17:53:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9964DDFE5C; Fri, 31 Mar 2017 17:53:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Fri, 31 Mar 2017 17:53:39 -0000 Message-Id: <6ebf89e0cf7745c49d9daff3c75137b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Add TestStream to Python SDK archived-at: Fri, 31 Mar 2017 17:53:41 -0000 Repository: beam Updated Branches: refs/heads/master 62473ae4b -> 023e6ab94 Add TestStream to Python SDK The TestStream will be used for verifying streaming runner semantics. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55db47d5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55db47d5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55db47d5 Branch: refs/heads/master Commit: 55db47d50bb97b238926c2a1b0b80c36b5345d44 Parents: 62473ae Author: Charles Chen Authored: Thu Mar 30 18:20:04 2017 -0700 Committer: Ahmet Altay Committed: Fri Mar 31 10:53:16 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/window.py | 5 + .../apache_beam/transforms/window_test.py | 6 + sdks/python/apache_beam/utils/test_stream.py | 163 +++++++++++++++++++ .../apache_beam/utils/test_stream_test.py | 82 ++++++++++ 4 files changed, 256 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 3878dff..dcc58b7 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -246,6 +246,11 @@ class TimestampedValue(object): self.value = value self.timestamp = Timestamp.of(timestamp) + def __cmp__(self, other): + if type(self) is not type(other): + return cmp(type(self), type(other)) + return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + class GlobalWindow(BoundedWindow): """The default window into which all data is placed (via GlobalWindows).""" http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 11c8a68..1ac95e4 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -62,6 +62,12 @@ reify_windows = core.ParDo(ReifyWindowsFn()) class WindowTest(unittest.TestCase): + def test_timestamped_value_cmp(self): + self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2)) + self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.0)) + self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.1)) + self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('b', 2)) + def test_global_window(self): self.assertEqual(GlobalWindow(), GlobalWindow()) self.assertNotEqual(GlobalWindow(), http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/utils/test_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/test_stream.py b/sdks/python/apache_beam/utils/test_stream.py new file mode 100644 index 0000000..7ae27b7 --- /dev/null +++ b/sdks/python/apache_beam/utils/test_stream.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Provides TestStream for verifying streaming runner semantics.""" + +from abc import ABCMeta +from abc import abstractmethod + +from apache_beam import coders +from apache_beam import pvalue +from apache_beam.transforms import PTransform +from apache_beam.transforms.window import TimestampedValue +from apache_beam.utils import timestamp +from apache_beam.utils.windowed_value import WindowedValue + + +class Event(object): + """Test stream event to be emitted during execution of a TestStream.""" + + __metaclass__ = ABCMeta + + def __cmp__(self, other): + if type(self) is not type(other): + return cmp(type(self), type(other)) + return self._typed_cmp(other) + + @abstractmethod + def _typed_cmp(self, other): + raise NotImplementedError + + +class ElementEvent(Event): + """Element-producing test stream event.""" + + def __init__(self, timestamped_values): + self.timestamped_values = timestamped_values + + def _typed_cmp(self, other): + return cmp(self.timestamped_values, other.timestamped_values) + + +class WatermarkEvent(Event): + """Watermark-advancing test stream event.""" + + def __init__(self, new_watermark): + self.new_watermark = timestamp.Timestamp.of(new_watermark) + + def _typed_cmp(self, other): + return cmp(self.new_watermark, other.new_watermark) + + +class ProcessingTimeEvent(Event): + """Processing time-advancing test stream event.""" + + def __init__(self, advance_by): + self.advance_by = timestamp.Duration.of(advance_by) + + def _typed_cmp(self, other): + return cmp(self.advance_by, other.advance_by) + + +class TestStream(PTransform): + """Test stream that generates events on an unbounded PCollection of elements. + + Each event emits elements, advances the watermark or advances the processing + time. After all of the specified elements are emitted, ceases to produce + output. + """ + + def __init__(self, coder=coders.FastPrimitivesCoder): + assert coder is not None + self.coder = coder + self.current_watermark = timestamp.MIN_TIMESTAMP + self.events = [] + + def expand(self, pbegin): + assert isinstance(pbegin, pvalue.PBegin) + self.pipeline = pbegin.pipeline + return pvalue.PCollection(self.pipeline) + + def _infer_output_coder(self, input_type=None, input_coder=None): + return self.coder + + def _add(self, event): + if isinstance(event, ElementEvent): + for tv in event.timestamped_values: + assert tv.timestamp < timestamp.MAX_TIMESTAMP, ( + 'Element timestamp must be before timestamp.MAX_TIMESTAMP.') + elif isinstance(event, WatermarkEvent): + assert event.new_watermark > self.current_watermark, ( + 'Watermark must strictly-monotonically advance.') + self.current_watermark = event.new_watermark + elif isinstance(event, ProcessingTimeEvent): + assert event.advance_by > 0, ( + 'Must advance processing time by positive amount.') + else: + raise ValueError('Unknown event: %s' % event) + self.events.append(event) + + def add_elements(self, elements): + """Add elements to the TestStream. + + Elements added to the TestStream will be produced during pipeline execution. + These elements can be TimestampedValue, WindowedValue or raw unwrapped + elements that are serializable using the TestStream's specified Coder. When + a TimestampedValue or a WindowedValue element is used, the timestamp of the + TimestampedValue or WindowedValue will be the timestamp of the produced + element; otherwise, the current watermark timestamp will be used for that + element. The windows of a given WindowedValue are ignored by the + TestStream. + """ + timestamped_values = [] + for element in elements: + if isinstance(element, TimestampedValue): + timestamped_values.append(element) + elif isinstance(element, WindowedValue): + # Drop windows for elements in test stream. + timestamped_values.append( + TimestampedValue(element.value, element.timestamp)) + else: + # Add elements with timestamp equal to current watermark. + timestamped_values.append( + TimestampedValue(element, self.current_watermark)) + self._add(ElementEvent(timestamped_values)) + return self + + def advance_watermark_to(self, new_watermark): + """Advance the watermark to a given Unix timestamp. + + The Unix timestamp value used must be later than the previous watermark + value and should be given as an int, float or utils.timestamp.Timestamp + object. + """ + self._add(WatermarkEvent(new_watermark)) + return self + + def advance_watermark_to_infinity(self): + """Advance the watermark to the end of time.""" + self.advance_watermark_to(timestamp.MAX_TIMESTAMP) + return self + + def advance_processing_time(self, advance_by): + """Advance the current processing time by a given duration in seconds. + + The duration must be a positive second duration and should be given as an + int, float or utils.timestamp.Duration object. + """ + self._add(ProcessingTimeEvent(advance_by)) + return self http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/utils/test_stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/test_stream_test.py b/sdks/python/apache_beam/utils/test_stream_test.py new file mode 100644 index 0000000..cc207ee --- /dev/null +++ b/sdks/python/apache_beam/utils/test_stream_test.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the test_stream module.""" + +import unittest + +from apache_beam.transforms.window import TimestampedValue +from apache_beam.utils import timestamp +from apache_beam.utils.test_stream import ElementEvent +from apache_beam.utils.test_stream import ProcessingTimeEvent +from apache_beam.utils.test_stream import TestStream +from apache_beam.utils.test_stream import WatermarkEvent +from apache_beam.utils.windowed_value import WindowedValue + + +class TestStreamTest(unittest.TestCase): + + def test_basic_test_stream(self): + test_stream = (TestStream() + .advance_watermark_to(0) + .add_elements([ + 'a', + WindowedValue('b', 3, []), + TimestampedValue('c', 6)]) + .advance_processing_time(10) + .advance_watermark_to(8) + .add_elements(['d']) + .advance_watermark_to_infinity()) + self.assertEqual( + test_stream.events, + [ + WatermarkEvent(0), + ElementEvent([ + TimestampedValue('a', 0), + TimestampedValue('b', 3), + TimestampedValue('c', 6), + ]), + ProcessingTimeEvent(10), + WatermarkEvent(8), + ElementEvent([ + TimestampedValue('d', 8), + ]), + WatermarkEvent(timestamp.MAX_TIMESTAMP), + ] + ) + + def test_test_stream_errors(self): + with self.assertRaises(AssertionError, msg=( + 'Watermark must strictly-monotonically advance.')): + _ = (TestStream() + .advance_watermark_to(5) + .advance_watermark_to(4)) + + with self.assertRaises(AssertionError, msg=( + 'Must advance processing time by positive amount.')): + _ = (TestStream() + .advance_processing_time(-1)) + + with self.assertRaises(AssertionError, msg=( + 'Element timestamp must be before timestamp.MAX_TIMESTAMP.')): + _ = (TestStream() + .add_elements([ + TimestampedValue('a', timestamp.MAX_TIMESTAMP) + ])) + +if __name__ == '__main__': + unittest.main()