beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Add PubSub I/O support to Python DirectRunner
Date Thu, 29 Jun 2017 16:46:19 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4d41e25d8 -> 2dd1907c6


Add PubSub I/O support to Python DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb7ec28c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb7ec28c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb7ec28c

Branch: refs/heads/master
Commit: fb7ec28cfb1291b04e0eac738054eefe0bb9a103
Parents: 4d41e25
Author: Charles Chen <ccy@google.com>
Authored: Mon Jun 26 18:03:53 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Thu Jun 29 09:46:03 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/streaming_wordcount.py | 12 ++-
 sdks/python/apache_beam/io/gcp/pubsub.py        | 91 +++++++++++++++-----
 sdks/python/apache_beam/io/gcp/pubsub_test.py   | 89 +++++++++++--------
 .../runners/dataflow/dataflow_runner.py         | 11 +--
 .../apache_beam/runners/direct/direct_runner.py | 54 ++++++++++++
 .../runners/direct/transform_evaluator.py       | 89 +++++++++++++++++++
 6 files changed, 281 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 4c29f2b..7696d77 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -28,6 +28,8 @@ import logging
 
 
 import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
 import apache_beam.transforms.window as window
 
 
@@ -41,13 +43,17 @@ def run(argv=None):
   parser = argparse.ArgumentParser()
   parser.add_argument(
       '--input_topic', required=True,
-      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+      help=('Input PubSub topic of the form '
+            '"projects/<PROJECT>/topics/<TOPIC>".'))
   parser.add_argument(
       '--output_topic', required=True,
-      help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+      help=('Output PubSub topic of the form '
+            '"projects/<PROJECT>/topic/<TOPIC>".'))
   known_args, pipeline_args = parser.parse_known_args(argv)
+  options = PipelineOptions(pipeline_args)
+  options.view_as(StandardOptions).streaming = True
 
-  with beam.Pipeline(argv=pipeline_args) as p:
+  with beam.Pipeline(options=options) as p:
 
     # Read from PubSub into a PCollection.
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)

http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index fabe296..32d388a 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -24,12 +24,16 @@ This API is currently under development and is subject to change.
 
 from __future__ import absolute_import
 
+import re
+
 from apache_beam import coders
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms import core
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import Map
+from apache_beam.transforms import window
 from apache_beam.transforms.display import DisplayDataItem
 
 
@@ -43,11 +47,12 @@ class ReadStringsFromPubSub(PTransform):
     """Initializes ``ReadStringsFromPubSub``.
 
     Attributes:
-      topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
-        provided then subscription must be None.
+      topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
+        <topic>". If provided, subscription must be None.
       subscription: Existing Cloud Pub/Sub subscription to use in the
-        form "projects/<project>/subscriptions/<subscription>". If provided then
-        topic must be None.
+        form "projects/<project>/subscriptions/<subscription>". If not
+        specified, a temporary subscription will be created from the specified
+        topic. If provided, topic must be None.
       id_label: The attribute on incoming Pub/Sub messages to use as a unique
         record identifier.  When specified, the value of this attribute (which
         can be any string that uniquely identifies the record) will be used for
@@ -56,17 +61,14 @@ class ReadStringsFromPubSub(PTransform):
         case, deduplication of the stream will be strictly best effort.
     """
     super(ReadStringsFromPubSub, self).__init__()
-    if topic and subscription:
-      raise ValueError("Only one of topic or subscription should be provided.")
-
-    if not (topic or subscription):
-      raise ValueError("Either a topic or subscription must be provided.")
-
     self._source = _PubSubPayloadSource(
         topic,
         subscription=subscription,
         id_label=id_label)
 
+  def get_windowing(self, unused_inputs):
+    return core.Windowing(window.GlobalWindows())
+
   def expand(self, pvalue):
     pcoll = pvalue.pipeline | Read(self._source)
     pcoll.element_type = bytes
@@ -93,15 +95,45 @@ class WriteStringsToPubSub(PTransform):
     return pcoll | Write(self._sink)
 
 
+PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
+SUBSCRIPTION_REGEXP = 'projects/([^/]+)/subscriptions/(.+)'
+TOPIC_REGEXP = 'projects/([^/]+)/topics/(.+)'
+
+
+def parse_topic(full_topic):
+  match = re.match(TOPIC_REGEXP, full_topic)
+  if not match:
+    raise ValueError(
+        'PubSub topic must be in the form "projects/<project>/topics'
+        '/<topic>" (got %r).' % full_topic)
+  project, topic_name = match.group(1), match.group(2)
+  if not re.match(PROJECT_ID_REGEXP, project):
+    raise ValueError('Invalid PubSub project name: %r.' % project)
+  return project, topic_name
+
+
+def parse_subscription(full_subscription):
+  match = re.match(SUBSCRIPTION_REGEXP, full_subscription)
+  if not match:
+    raise ValueError(
+        'PubSub subscription must be in the form "projects/<project>'
+        '/subscriptions/<subscription>" (got %r).' % full_subscription)
+  project, subscription_name = match.group(1), match.group(2)
+  if not re.match(PROJECT_ID_REGEXP, project):
+    raise ValueError('Invalid PubSub project name: %r.' % project)
+  return project, subscription_name
+
+
 class _PubSubPayloadSource(dataflow_io.NativeSource):
   """Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
 
   Attributes:
-    topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
-      provided then topic must be None.
+    topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
+      If provided, subscription must be None.
     subscription: Existing Cloud Pub/Sub subscription to use in the
-      form "projects/<project>/subscriptions/<subscription>". If provided then
-      subscription must be None.
+      form "projects/<project>/subscriptions/<subscription>". If not specified,
+      a temporary subscription will be created from the specified topic. If
+      provided, topic must be None.
     id_label: The attribute on incoming Pub/Sub messages to use as a unique
       record identifier.  When specified, the value of this attribute (which can
       be any string that uniquely identifies the record) will be used for
@@ -111,13 +143,26 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
   """
 
   def __init__(self, topic=None, subscription=None, id_label=None):
-    # we are using this coder explicitly for portability reasons of PubsubIO
+    # We are using this coder explicitly for portability reasons of PubsubIO
     # across implementations in languages.
     self.coder = coders.BytesCoder()
-    self.topic = topic
-    self.subscription = subscription
+    self.full_topic = topic
+    self.full_subscription = subscription
+    self.topic_name = None
+    self.subscription_name = None
     self.id_label = id_label
 
+    # Perform some validation on the topic and subscription.
+    if not (topic or subscription):
+      raise ValueError('Either a topic or subscription must be provided.')
+    if topic and subscription:
+      raise ValueError('Only one of topic or subscription should be provided.')
+
+    if topic:
+      self.project, self.topic_name = parse_topic(topic)
+    if subscription:
+      self.project, self.subscription_name = parse_subscription(subscription)
+
   @property
   def format(self):
     """Source format name required for remote execution."""
@@ -128,10 +173,10 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
             DisplayDataItem(self.id_label,
                             label='ID Label Attribute').drop_if_none(),
             'topic':
-            DisplayDataItem(self.topic,
-                            label='Pubsub Topic'),
+            DisplayDataItem(self.full_topic,
+                            label='Pubsub Topic').drop_if_none(),
             'subscription':
-            DisplayDataItem(self.subscription,
+            DisplayDataItem(self.full_subscription,
                             label='Pubsub Subscription').drop_if_none()}
 
   def reader(self):
@@ -146,7 +191,9 @@ class _PubSubPayloadSink(dataflow_io.NativeSink):
     # we are using this coder explicitly for portability reasons of PubsubIO
     # across implementations in languages.
     self.coder = coders.BytesCoder()
-    self.topic = topic
+    self.full_topic = topic
+
+    self.project, self.topic_name = parse_topic(topic)
 
   @property
   def format(self):
@@ -154,7 +201,7 @@ class _PubSubPayloadSink(dataflow_io.NativeSink):
     return 'pubsub'
 
   def display_data(self):
-    return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')}
+    return {'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic')}
 
   def writer(self):
     raise NotImplementedError(

http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/io/gcp/pubsub_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 5d3e985..0dcc3c3 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -31,89 +31,108 @@ from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 
+# Protect against environments where the PubSub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestReadStringsFromPubSub(unittest.TestCase):
   def test_expand_with_topic(self):
     p = TestPipeline()
-    pcoll = p | ReadStringsFromPubSub('a_topic', None, 'a_label')
+    pcoll = p | ReadStringsFromPubSub('projects/fakeprj/topics/a_topic',
+                                      None, 'a_label')
     # Ensure that the output type is str
     self.assertEqual(unicode, pcoll.element_type)
 
-    # Ensure that the type on the intermediate read output PCollection is bytes
-    read_pcoll = pcoll.producer.inputs[0]
-    self.assertEqual(bytes, read_pcoll.element_type)
-
     # Ensure that the properties passed through correctly
-    source = read_pcoll.producer.transform.source
-    self.assertEqual('a_topic', source.topic)
+    source = pcoll.producer.transform._source
+    self.assertEqual('a_topic', source.topic_name)
     self.assertEqual('a_label', source.id_label)
 
   def test_expand_with_subscription(self):
     p = TestPipeline()
-    pcoll = p | ReadStringsFromPubSub(None, 'a_subscription', 'a_label')
+    pcoll = p | ReadStringsFromPubSub(
+        None, 'projects/fakeprj/subscriptions/a_subscription', 'a_label')
     # Ensure that the output type is str
     self.assertEqual(unicode, pcoll.element_type)
 
-    # Ensure that the type on the intermediate read output PCollection is bytes
-    read_pcoll = pcoll.producer.inputs[0]
-    self.assertEqual(bytes, read_pcoll.element_type)
-
     # Ensure that the properties passed through correctly
-    source = read_pcoll.producer.transform.source
-    self.assertEqual('a_subscription', source.subscription)
+    source = pcoll.producer.transform._source
+    self.assertEqual('a_subscription', source.subscription_name)
     self.assertEqual('a_label', source.id_label)
 
-  def test_expand_with_both_topic_and_subscription(self):
-    with self.assertRaisesRegexp(
-        ValueError, "Only one of topic or subscription should be provided."):
-      ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
-
   def test_expand_with_no_topic_or_subscription(self):
     with self.assertRaisesRegexp(
         ValueError, "Either a topic or subscription must be provided."):
       ReadStringsFromPubSub(None, None, 'a_label')
 
+  def test_expand_with_both_topic_and_subscription(self):
+    with self.assertRaisesRegexp(
+        ValueError, "Only one of topic or subscription should be provided."):
+      ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+
 
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestWriteStringsToPubSub(unittest.TestCase):
   def test_expand(self):
     p = TestPipeline()
-    pdone = p | ReadStringsFromPubSub('baz') | WriteStringsToPubSub('a_topic')
+    pdone = (p
+             | ReadStringsFromPubSub('projects/fakeprj/topics/baz')
+             | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
 
     # Ensure that the properties passed through correctly
-    sink = pdone.producer.transform.sink
-    self.assertEqual('a_topic', sink.topic)
-
-    # Ensure that the type on the intermediate payload transformer output
-    # PCollection is bytes
-    write_pcoll = pdone.producer.inputs[0]
-    self.assertEqual(bytes, write_pcoll.element_type)
+    self.assertEqual('a_topic', pdone.producer.transform.dofn.topic_name)
 
 
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestPubSubSource(unittest.TestCase):
-  def test_display_data(self):
-    source = _PubSubPayloadSource('a_topic', 'a_subscription', 'a_label')
+  def test_display_data_topic(self):
+    source = _PubSubPayloadSource(
+        'projects/fakeprj/topics/a_topic',
+        None,
+        'a_label')
+    dd = DisplayData.create_from(source)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'topic', 'projects/fakeprj/topics/a_topic'),
+        DisplayDataItemMatcher('id_label', 'a_label')]
+
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_display_data_subscription(self):
+    source = _PubSubPayloadSource(
+        None,
+        'projects/fakeprj/subscriptions/a_subscription',
+        'a_label')
     dd = DisplayData.create_from(source)
     expected_items = [
-        DisplayDataItemMatcher('topic', 'a_topic'),
-        DisplayDataItemMatcher('subscription', 'a_subscription'),
+        DisplayDataItemMatcher(
+            'subscription', 'projects/fakeprj/subscriptions/a_subscription'),
         DisplayDataItemMatcher('id_label', 'a_label')]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_display_data_no_subscription(self):
-    source = _PubSubPayloadSource('a_topic')
+    source = _PubSubPayloadSource('projects/fakeprj/topics/a_topic')
     dd = DisplayData.create_from(source)
     expected_items = [
-        DisplayDataItemMatcher('topic', 'a_topic')]
+        DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
 
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestPubSubSink(unittest.TestCase):
   def test_display_data(self):
-    sink = _PubSubPayloadSink('a_topic')
+    sink = _PubSubPayloadSink('projects/fakeprj/topics/a_topic')
     dd = DisplayData.create_from(sink)
     expected_items = [
-        DisplayDataItemMatcher('topic', 'a_topic')]
+        DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f213b3b..57bcc5e 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -668,11 +668,12 @@ class DataflowRunner(PipelineRunner):
         raise ValueError('PubSubPayloadSource is currently available for use '
                          'only in streaming pipelines.')
       # Only one of topic or subscription should be set.
-      if transform.source.topic:
-        step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
-      elif transform.source.subscription:
+      if transform.source.full_subscription:
         step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
-                          transform.source.subscription)
+                          transform.source.full_subscription)
+      elif transform.source.full_topic:
+        step.add_property(PropertyNames.PUBSUB_TOPIC,
+                          transform.source.full_topic)
       if transform.source.id_label:
         step.add_property(PropertyNames.PUBSUB_ID_LABEL,
                           transform.source.id_label)
@@ -756,7 +757,7 @@ class DataflowRunner(PipelineRunner):
       if not standard_options.streaming:
         raise ValueError('PubSubPayloadSink is currently available for use '
                          'only in streaming pipelines.')
-      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic)
+      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic)
     else:
       raise ValueError(
           'Sink %r has unexpected format %s.' % (

http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2a75977..1a94b3d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,8 +26,10 @@ from __future__ import absolute_import
 import collections
 import logging
 
+import apache_beam as beam
 from apache_beam import typehints
 from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.pvalue import PCollection
 from apache_beam.runners.direct.bundle_factory import BundleFactory
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
@@ -107,6 +109,58 @@ class DirectRunner(PipelineRunner):
                       .with_output_types(*type_hints.output_types[0]))
     return transform.expand(pcoll)
 
+  def apply_ReadStringsFromPubSub(self, transform, pcoll):
+    try:
+      from google.cloud import pubsub as unused_pubsub
+    except ImportError:
+      raise ImportError('Google Cloud PubSub not available, please install '
+                        'apache_beam[gcp]')
+    # Execute this as a native transform.
+    output = PCollection(pcoll.pipeline)
+    output.element_type = unicode
+    return output
+
+  def apply_WriteStringsToPubSub(self, transform, pcoll):
+    try:
+      from google.cloud import pubsub
+    except ImportError:
+      raise ImportError('Google Cloud PubSub not available, please install '
+                        'apache_beam[gcp]')
+    project = transform._sink.project
+    topic_name = transform._sink.topic_name
+
+    class DirectWriteToPubSub(beam.DoFn):
+      _topic = None
+
+      def __init__(self, project, topic_name):
+        self.project = project
+        self.topic_name = topic_name
+
+      def start_bundle(self):
+        if self._topic is None:
+          self._topic = pubsub.Client(project=self.project).topic(
+              self.topic_name)
+        self._buffer = []
+
+      def process(self, elem):
+        self._buffer.append(elem.encode('utf-8'))
+        if len(self._buffer) >= 100:
+          self._flush()
+
+      def finish_bundle(self):
+        self._flush()
+
+      def _flush(self):
+        if self._buffer:
+          with self._topic.batch() as batch:
+            for datum in self._buffer:
+              batch.publish(datum)
+          self._buffer = []
+
+    output = pcoll | beam.ParDo(DirectWriteToPubSub(project, topic_name))
+    output.element_type = unicode
+    return output
+
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 67b2492..641291d 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -20,6 +20,8 @@
 from __future__ import absolute_import
 
 import collections
+import random
+import time
 
 from apache_beam import coders
 from apache_beam import pvalue
@@ -48,6 +50,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
 from apache_beam.utils import counters
+from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.options.pipeline_options import TypeOptions
 
@@ -63,6 +66,7 @@ class TransformEvaluatorRegistry(object):
     self._evaluation_context = evaluation_context
     self._evaluators = {
         io.Read: _BoundedReadEvaluator,
+        io.ReadStringsFromPubSub: _PubSubReadEvaluator,
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
@@ -357,6 +361,91 @@ class _TestStreamEvaluator(_TransformEvaluator):
         {None: hold})
 
 
+class _PubSubSubscriptionWrapper(object):
+  """Wrapper for garbage-collecting temporary PubSub subscriptions."""
+
+  def __init__(self, subscription, should_cleanup):
+    self.subscription = subscription
+    self.should_cleanup = should_cleanup
+
+  def __del__(self):
+    if self.should_cleanup:
+      self.subscription.delete()
+
+
+class _PubSubReadEvaluator(_TransformEvaluator):
+  """TransformEvaluator for PubSub read."""
+
+  _subscription_cache = {}
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs, scoped_metrics_container):
+    assert not side_inputs
+    super(_PubSubReadEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs, scoped_metrics_container)
+
+    source = self._applied_ptransform.transform._source
+    self._subscription = _PubSubReadEvaluator.get_subscription(
+        self._applied_ptransform, source.project, source.topic_name,
+        source.subscription_name)
+
+  @classmethod
+  def get_subscription(cls, transform, project, topic, subscription_name):
+    if transform not in cls._subscription_cache:
+      from google.cloud import pubsub
+      should_create = not subscription_name
+      if should_create:
+        subscription_name = 'beam_%d_%x' % (
+            int(time.time()), random.randrange(1 << 32))
+      cls._subscription_cache[transform] = _PubSubSubscriptionWrapper(
+          pubsub.Client(project=project).topic(topic).subscription(
+              subscription_name),
+          should_create)
+      if should_create:
+        cls._subscription_cache[transform].subscription.create()
+    return cls._subscription_cache[transform].subscription
+
+  def start_bundle(self):
+    pass
+
+  def process_element(self, element):
+    pass
+
+  def _read_from_pubsub(self):
+    from google.cloud import pubsub
+    # Because of the AutoAck, we are not able to reread messages if this
+    # evaluator fails with an exception before emitting a bundle. However,
+    # the DirectRunner currently doesn't retry work items anyway, so the
+    # pipeline would enter an inconsistent state on any error.
+    with pubsub.subscription.AutoAck(
+        self._subscription, return_immediately=True,
+        max_messages=10) as results:
+      return [message.data for unused_ack_id, message in results.items()]
+
+  def finish_bundle(self):
+    data = self._read_from_pubsub()
+    if data:
+      output_pcollection = list(self._outputs)[0]
+      bundle = self._evaluation_context.create_bundle(output_pcollection)
+      # TODO(ccy): we currently do not use the PubSub message timestamp or
+      # respect the PubSub source's id_label field.
+      now = Timestamp.of(time.time())
+      for message_data in data:
+        bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now))
+      bundles = [bundle]
+    else:
+      bundles = []
+    input_pvalue = self._applied_ptransform.inputs
+    if not input_pvalue:
+      input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
+    unprocessed_bundle = self._evaluation_context.create_bundle(
+        input_pvalue)
+    return TransformResult(
+        self._applied_ptransform, bundles,
+        [unprocessed_bundle], None, {None: Timestamp.of(time.time())})
+
+
 class _FlattenEvaluator(_TransformEvaluator):
   """TransformEvaluator for Flatten transform."""
 


Mime
View raw message