beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-2354] Add a ReadStringsFromPubSub/WriteStringsToPubSub PTransform
Date Fri, 26 May 2017 02:24:11 GMT
Repository: beam
Updated Branches:
  refs/heads/master 7bb10a694 -> 50200f23a


[BEAM-2354] Add a ReadStringsFromPubSub/WriteStringsToPubSub PTransform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14329b86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14329b86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14329b86

Branch: refs/heads/master
Commit: 14329b86e0732acf0190e10d9b04c574baaed2e7
Parents: 7bb10a6
Author: Luke Cwik <lcwik@google.com>
Authored: Wed May 24 18:14:59 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu May 25 19:23:24 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/streaming_wordcap.py   |  6 +-
 .../apache_beam/examples/streaming_wordcount.py |  6 +-
 sdks/python/apache_beam/io/gcp/pubsub.py        | 87 ++++++++++++++++----
 sdks/python/apache_beam/io/gcp/pubsub_test.py   | 57 +++++++++++--
 .../runners/dataflow/dataflow_runner.py         |  8 +-
 5 files changed, 132 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/14329b86/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index ce43e1f..19f9e5f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -44,8 +44,7 @@ def run(argv=None):
   with beam.Pipeline(argv=pipeline_args) as p:
 
     # Read the text file[pattern] into a PCollection.
-    lines = p | beam.io.Read(
-        beam.io.PubSubSource(known_args.input_topic))
+    lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
     transformed = (lines
@@ -53,8 +52,7 @@ def run(argv=None):
 
     # Write to PubSub.
     # pylint: disable=expression-not-assigned
-    transformed | beam.io.Write(
-        beam.io.PubSubSink(known_args.output_topic))
+    transformed | beam.io.WriteStringsToPubSub(known_args.output_topic)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/14329b86/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index e9d5dbe..ed8b5d0 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -47,8 +47,7 @@ def run(argv=None):
   with beam.Pipeline(argv=pipeline_args) as p:
 
     # Read the text file[pattern] into a PCollection.
-    lines = p | 'read' >> beam.io.Read(
-        beam.io.PubSubSource(known_args.input_topic))
+    lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
     transformed = (lines
@@ -63,8 +62,7 @@ def run(argv=None):
 
     # Write to PubSub.
     # pylint: disable=expression-not-assigned
-    transformed | 'pubsub_write' >> beam.io.Write(
-        beam.io.PubSubSink(known_args.output_topic))
+    transformed | beam.io.WriteStringsToPubSub(known_args.output_topic)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/14329b86/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 103fce0..7b838d2 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -22,15 +22,68 @@ pipelines, during remote execution.
 
 from __future__ import absolute_import
 
-from apache_beam import coders
+from apache_beam.io.iobase import Read
+from apache_beam.io.iobase import Write
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms import PTransform
+from apache_beam.transforms import ParDo
 from apache_beam.transforms.display import DisplayDataItem
 
-__all__ = ['PubSubSink', 'PubSubSource']
 
+__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub']
 
-class PubSubSource(dataflow_io.NativeSource):
-  """Source for reading from a given Cloud Pub/Sub topic.
+
+class ReadStringsFromPubSub(PTransform):
+  """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub."""
+
+  def __init__(self, topic, subscription=None, id_label=None):
+    """Initializes ``ReadStringsFromPubSub``.
+
+    Attributes:
+      topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
+      subscription: Optional existing Cloud Pub/Sub subscription to use in the
+        form "projects/<project>/subscriptions/<subscription>".
+      id_label: The attribute on incoming Pub/Sub messages to use as a unique
+        record identifier.  When specified, the value of this attribute (which
+        can be any string that uniquely identifies the record) will be used for
+        deduplication of messages.  If not provided, we cannot guarantee
+        that no duplicate data will be delivered on the Pub/Sub stream. In this
+        case, deduplication of the stream will be strictly best effort.
+    """
+    super(ReadStringsFromPubSub, self).__init__()
+    self._source = _PubSubPayloadSource(
+        topic,
+        subscription=subscription,
+        id_label=id_label)
+
+  def expand(self, pvalue):
+    pcoll = pvalue.pipeline | Read(self._source)
+    pcoll.element_type = bytes
+    pcoll = pcoll | 'decode string' >> ParDo(_decodeUtf8String)
+    pcoll.element_type = unicode
+    return pcoll
+
+
+class WriteStringsToPubSub(PTransform):
+  """A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub."""
+
+  def __init__(self, topic):
+    """Initializes ``WriteStringsToPubSub``.
+
+    Attributes:
+      topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
+    """
+    super(WriteStringsToPubSub, self).__init__()
+    self._sink = _PubSubPayloadSink(topic)
+
+  def expand(self, pcoll):
+    pcoll = pcoll | 'encode string' >> ParDo(_encodeUtf8String)
+    pcoll.element_type = bytes
+    return pcoll | Write(self._sink)
+
+
+class _PubSubPayloadSource(dataflow_io.NativeSource):
+  """Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
 
   Attributes:
     topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
@@ -42,15 +95,12 @@ class PubSubSource(dataflow_io.NativeSource):
       deduplication of messages.  If not provided, Dataflow cannot guarantee
       that no duplicate data will be delivered on the Pub/Sub stream. In this
       case, deduplication of the stream will be strictly best effort.
-    coder: The Coder to use for decoding incoming Pub/Sub messages.
   """
 
-  def __init__(self, topic, subscription=None, id_label=None,
-               coder=coders.StrUtf8Coder()):
+  def __init__(self, topic, subscription=None, id_label=None):
     self.topic = topic
     self.subscription = subscription
     self.id_label = id_label
-    self.coder = coder
 
   @property
   def format(self):
@@ -70,15 +120,14 @@ class PubSubSource(dataflow_io.NativeSource):
 
   def reader(self):
     raise NotImplementedError(
-        'PubSubSource is not supported in local execution.')
+        'PubSubPayloadSource is not supported in local execution.')
 
 
-class PubSubSink(dataflow_io.NativeSink):
-  """Sink for writing to a given Cloud Pub/Sub topic."""
+class _PubSubPayloadSink(dataflow_io.NativeSink):
+  """Sink for the payload of a message as bytes to a Cloud Pub/Sub topic."""
 
-  def __init__(self, topic, coder=coders.StrUtf8Coder()):
+  def __init__(self, topic):
     self.topic = topic
-    self.coder = coder
 
   @property
   def format(self):
@@ -90,4 +139,14 @@ class PubSubSink(dataflow_io.NativeSink):
 
   def writer(self):
     raise NotImplementedError(
-        'PubSubSink is not supported in local execution.')
+        'PubSubPayloadSink is not supported in local execution.')
+
+
+def _decodeUtf8String(encoded_value):
+  """Decodes a string in utf-8 format from bytes"""
+  return encoded_value.decode('utf-8')
+
+
+def _encodeUtf8String(value):
+  """Encodes a string in utf-8 format to bytes"""
+  return value.encode('utf-8')

http://git-wip-us.apache.org/repos/asf/beam/blob/14329b86/sdks/python/apache_beam/io/gcp/pubsub_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 1642a95..322d08a 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -22,16 +22,53 @@ import unittest
 
 import hamcrest as hc
 
-from apache_beam.io.gcp.pubsub import PubSubSink
-from apache_beam.io.gcp.pubsub import PubSubSource
+from apache_beam.io.gcp.pubsub import _decodeUtf8String
+from apache_beam.io.gcp.pubsub import _encodeUtf8String
+from apache_beam.io.gcp.pubsub import _PubSubPayloadSink
+from apache_beam.io.gcp.pubsub import _PubSubPayloadSource
+from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
+from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
+from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 
-class TestPubSubSource(unittest.TestCase):
+class TestReadStringsFromPubSub(unittest.TestCase):
+  def test_expand(self):
+    p = TestPipeline()
+    pcoll = p | ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+    # Ensure that the output type is str
+    self.assertEqual(unicode, pcoll.element_type)
+
+    # Ensure that the type on the intermediate read output PCollection is bytes
+    read_pcoll = pcoll.producer.inputs[0]
+    self.assertEqual(bytes, read_pcoll.element_type)
+
+    # Ensure that the properties passed through correctly
+    source = read_pcoll.producer.transform.source
+    self.assertEqual('a_topic', source.topic)
+    self.assertEqual('a_subscription', source.subscription)
+    self.assertEqual('a_label', source.id_label)
+
+
+class TestWriteStringsToPubSub(unittest.TestCase):
+  def test_expand(self):
+    p = TestPipeline()
+    pdone = p | ReadStringsFromPubSub('baz') | WriteStringsToPubSub('a_topic')
+
+    # Ensure that the properties passed through correctly
+    sink = pdone.producer.transform.sink
+    self.assertEqual('a_topic', sink.topic)
 
+    # Ensure that the type on the intermediate payload transformer output
+    # PCollection is bytes
+    write_pcoll = pdone.producer.inputs[0]
+    self.assertEqual(bytes, write_pcoll.element_type)
+
+
+class TestPubSubSource(unittest.TestCase):
   def test_display_data(self):
-    source = PubSubSource('a_topic', 'a_subscription', 'a_label')
+    source = _PubSubPayloadSource('a_topic', 'a_subscription', 'a_label')
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('topic', 'a_topic'),
@@ -41,7 +78,7 @@ class TestPubSubSource(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_display_data_no_subscription(self):
-    source = PubSubSource('a_topic')
+    source = _PubSubPayloadSource('a_topic')
     dd = DisplayData.create_from(source)
     expected_items = [
         DisplayDataItemMatcher('topic', 'a_topic')]
@@ -51,7 +88,7 @@ class TestPubSubSource(unittest.TestCase):
 
 class TestPubSubSink(unittest.TestCase):
   def test_display_data(self):
-    sink = PubSubSink('a_topic')
+    sink = _PubSubPayloadSink('a_topic')
     dd = DisplayData.create_from(sink)
     expected_items = [
         DisplayDataItemMatcher('topic', 'a_topic')]
@@ -59,6 +96,14 @@ class TestPubSubSink(unittest.TestCase):
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
 
+class TestEncodeDecodeUtf8String(unittest.TestCase):
+  def test_encode(self):
+    self.assertEqual(b'test_data', _encodeUtf8String('test_data'))
+
+  def test_decode(self):
+    self.assertEqual('test_data', _decodeUtf8String(b'test_data'))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/14329b86/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index a05e582..046d3d5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -595,8 +595,8 @@ class DataflowRunner(PipelineRunner):
       standard_options = (
           transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
       if not standard_options.streaming:
-        raise ValueError('PubSubSource is currently available for use only in '
-                         'streaming pipelines.')
+        raise ValueError('PubSubPayloadSource is currently available for use '
+                         'only in streaming pipelines.')
       step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
       if transform.source.subscription:
         step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
@@ -677,8 +677,8 @@ class DataflowRunner(PipelineRunner):
       standard_options = (
           transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
       if not standard_options.streaming:
-        raise ValueError('PubSubSink is currently available for use only in '
-                         'streaming pipelines.')
+        raise ValueError('PubSubPayloadSink is currently available for use '
+                         'only in streaming pipelines.')
       step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic)
     else:
       raise ValueError(


Mime
View raw message