beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Add TestStream to Python SDK
Date Fri, 31 Mar 2017 17:53:39 GMT
Repository: beam
Updated Branches:
  refs/heads/master 62473ae4b -> 023e6ab94


Add TestStream to Python SDK

The TestStream will be used for verifying streaming runner semantics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55db47d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55db47d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55db47d5

Branch: refs/heads/master
Commit: 55db47d50bb97b238926c2a1b0b80c36b5345d44
Parents: 62473ae
Author: Charles Chen <ccy@google.com>
Authored: Thu Mar 30 18:20:04 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Mar 31 10:53:16 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/window.py    |   5 +
 .../apache_beam/transforms/window_test.py       |   6 +
 sdks/python/apache_beam/utils/test_stream.py    | 163 +++++++++++++++++++
 .../apache_beam/utils/test_stream_test.py       |  82 ++++++++++
 4 files changed, 256 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 3878dff..dcc58b7 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -246,6 +246,11 @@ class TimestampedValue(object):
     self.value = value
     self.timestamp = Timestamp.of(timestamp)
 
+  def __cmp__(self, other):
+    if type(self) is not type(other):
+      return cmp(type(self), type(other))
+    return cmp((self.value, self.timestamp), (other.value, other.timestamp))
+
 
 class GlobalWindow(BoundedWindow):
   """The default window into which all data is placed (via GlobalWindows)."""

http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 11c8a68..1ac95e4 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -62,6 +62,12 @@ reify_windows = core.ParDo(ReifyWindowsFn())
 
 class WindowTest(unittest.TestCase):
 
+  def test_timestamped_value_cmp(self):
+    self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2))
+    self.assertEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.0))
+    self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('a', 2.1))
+    self.assertNotEqual(TimestampedValue('a', 2), TimestampedValue('b', 2))
+
   def test_global_window(self):
     self.assertEqual(GlobalWindow(), GlobalWindow())
     self.assertNotEqual(GlobalWindow(),

http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/utils/test_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream.py b/sdks/python/apache_beam/utils/test_stream.py
new file mode 100644
index 0000000..7ae27b7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/test_stream.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Provides TestStream for verifying streaming runner semantics."""
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+from apache_beam import coders
+from apache_beam import pvalue
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class Event(object):
+  """Test stream event to be emitted during execution of a TestStream."""
+
+  __metaclass__ = ABCMeta
+
+  def __cmp__(self, other):
+    if type(self) is not type(other):
+      return cmp(type(self), type(other))
+    return self._typed_cmp(other)
+
+  @abstractmethod
+  def _typed_cmp(self, other):
+    raise NotImplementedError
+
+
+class ElementEvent(Event):
+  """Element-producing test stream event."""
+
+  def __init__(self, timestamped_values):
+    self.timestamped_values = timestamped_values
+
+  def _typed_cmp(self, other):
+    return cmp(self.timestamped_values, other.timestamped_values)
+
+
+class WatermarkEvent(Event):
+  """Watermark-advancing test stream event."""
+
+  def __init__(self, new_watermark):
+    self.new_watermark = timestamp.Timestamp.of(new_watermark)
+
+  def _typed_cmp(self, other):
+    return cmp(self.new_watermark, other.new_watermark)
+
+
+class ProcessingTimeEvent(Event):
+  """Processing time-advancing test stream event."""
+
+  def __init__(self, advance_by):
+    self.advance_by = timestamp.Duration.of(advance_by)
+
+  def _typed_cmp(self, other):
+    return cmp(self.advance_by, other.advance_by)
+
+
+class TestStream(PTransform):
+  """Test stream that generates events on an unbounded PCollection of elements.
+
+  Each event emits elements, advances the watermark or advances the processing
+  time.  After all of the specified elements are emitted, ceases to produce
+  output.
+  """
+
+  def __init__(self, coder=coders.FastPrimitivesCoder):
+    assert coder is not None
+    self.coder = coder
+    self.current_watermark = timestamp.MIN_TIMESTAMP
+    self.events = []
+
+  def expand(self, pbegin):
+    assert isinstance(pbegin, pvalue.PBegin)
+    self.pipeline = pbegin.pipeline
+    return pvalue.PCollection(self.pipeline)
+
+  def _infer_output_coder(self, input_type=None, input_coder=None):
+    return self.coder
+
+  def _add(self, event):
+    if isinstance(event, ElementEvent):
+      for tv in event.timestamped_values:
+        assert tv.timestamp < timestamp.MAX_TIMESTAMP, (
+            'Element timestamp must be before timestamp.MAX_TIMESTAMP.')
+    elif isinstance(event, WatermarkEvent):
+      assert event.new_watermark > self.current_watermark, (
+          'Watermark must strictly-monotonically advance.')
+      self.current_watermark = event.new_watermark
+    elif isinstance(event, ProcessingTimeEvent):
+      assert event.advance_by > 0, (
+          'Must advance processing time by positive amount.')
+    else:
+      raise ValueError('Unknown event: %s' % event)
+    self.events.append(event)
+
+  def add_elements(self, elements):
+    """Add elements to the TestStream.
+
+    Elements added to the TestStream will be produced during pipeline execution.
+    These elements can be TimestampedValue, WindowedValue or raw unwrapped
+    elements that are serializable using the TestStream's specified Coder.  When
+    a TimestampedValue or a WindowedValue element is used, the timestamp of the
+    TimestampedValue or WindowedValue will be the timestamp of the produced
+    element; otherwise, the current watermark timestamp will be used for that
+    element.  The windows of a given WindowedValue are ignored by the
+    TestStream.
+    """
+    timestamped_values = []
+    for element in elements:
+      if isinstance(element, TimestampedValue):
+        timestamped_values.append(element)
+      elif isinstance(element, WindowedValue):
+        # Drop windows for elements in test stream.
+        timestamped_values.append(
+            TimestampedValue(element.value, element.timestamp))
+      else:
+        # Add elements with timestamp equal to current watermark.
+        timestamped_values.append(
+            TimestampedValue(element, self.current_watermark))
+    self._add(ElementEvent(timestamped_values))
+    return self
+
+  def advance_watermark_to(self, new_watermark):
+    """Advance the watermark to a given Unix timestamp.
+
+    The Unix timestamp value used must be later than the previous watermark
+    value and should be given as an int, float or utils.timestamp.Timestamp
+    object.
+    """
+    self._add(WatermarkEvent(new_watermark))
+    return self
+
+  def advance_watermark_to_infinity(self):
+    """Advance the watermark to the end of time."""
+    self.advance_watermark_to(timestamp.MAX_TIMESTAMP)
+    return self
+
+  def advance_processing_time(self, advance_by):
+    """Advance the current processing time by a given duration in seconds.
+
+    The duration must be a positive second duration and should be given as an
+    int, float or utils.timestamp.Duration object.
+    """
+    self._add(ProcessingTimeEvent(advance_by))
+    return self

http://git-wip-us.apache.org/repos/asf/beam/blob/55db47d5/sdks/python/apache_beam/utils/test_stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/test_stream_test.py b/sdks/python/apache_beam/utils/test_stream_test.py
new file mode 100644
index 0000000..cc207ee
--- /dev/null
+++ b/sdks/python/apache_beam/utils/test_stream_test.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the test_stream module."""
+
+import unittest
+
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.test_stream import ElementEvent
+from apache_beam.utils.test_stream import ProcessingTimeEvent
+from apache_beam.utils.test_stream import TestStream
+from apache_beam.utils.test_stream import WatermarkEvent
+from apache_beam.utils.windowed_value import WindowedValue
+
+
+class TestStreamTest(unittest.TestCase):
+
+  def test_basic_test_stream(self):
+    test_stream = (TestStream()
+                   .advance_watermark_to(0)
+                   .add_elements([
+                       'a',
+                       WindowedValue('b', 3, []),
+                       TimestampedValue('c', 6)])
+                   .advance_processing_time(10)
+                   .advance_watermark_to(8)
+                   .add_elements(['d'])
+                   .advance_watermark_to_infinity())
+    self.assertEqual(
+        test_stream.events,
+        [
+            WatermarkEvent(0),
+            ElementEvent([
+                TimestampedValue('a', 0),
+                TimestampedValue('b', 3),
+                TimestampedValue('c', 6),
+            ]),
+            ProcessingTimeEvent(10),
+            WatermarkEvent(8),
+            ElementEvent([
+                TimestampedValue('d', 8),
+            ]),
+            WatermarkEvent(timestamp.MAX_TIMESTAMP),
+        ]
+    )
+
+  def test_test_stream_errors(self):
+    with self.assertRaises(AssertionError, msg=(
+        'Watermark must strictly-monotonically advance.')):
+      _ = (TestStream()
+           .advance_watermark_to(5)
+           .advance_watermark_to(4))
+
+    with self.assertRaises(AssertionError, msg=(
+        'Must advance processing time by positive amount.')):
+      _ = (TestStream()
+           .advance_processing_time(-1))
+
+    with self.assertRaises(AssertionError, msg=(
+        'Element timestamp must be before timestamp.MAX_TIMESTAMP.')):
+      _ = (TestStream()
+           .add_elements([
+               TimestampedValue('a', timestamp.MAX_TIMESTAMP)
+           ]))
+
+if __name__ == '__main__':
+  unittest.main()


Mime
View raw message