Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3DE44200CDE for ; Fri, 30 Jun 2017 23:31:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3CA35160C01; Fri, 30 Jun 2017 21:31:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2B722160C07 for ; Fri, 30 Jun 2017 23:31:07 +0200 (CEST) Received: (qmail 34005 invoked by uid 500); 30 Jun 2017 21:31:05 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 31855 invoked by uid 99); 30 Jun 2017 21:31:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Jun 2017 21:31:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 22BA3F3243; Fri, 30 Jun 2017 21:31:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Fri, 30 Jun 2017 21:31:35 -0000 Message-Id: In-Reply-To: <8e3ceb2abb59430881821528fc89b1d5@git.apache.org> References: <8e3ceb2abb59430881821528fc89b1d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/50] beam git commit: Add PubSub I/O support to Python DirectRunner archived-at: Fri, 30 Jun 2017 21:31:09 -0000 Add PubSub I/O support to Python DirectRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb7ec28c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb7ec28c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb7ec28c Branch: refs/heads/gearpump-runner Commit: fb7ec28cfb1291b04e0eac738054eefe0bb9a103 Parents: 4d41e25 Author: Charles Chen Authored: Mon Jun 26 18:03:53 2017 -0700 Committer: Ahmet Altay Committed: Thu Jun 29 09:46:03 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/streaming_wordcount.py | 12 ++- sdks/python/apache_beam/io/gcp/pubsub.py | 91 +++++++++++++++----- sdks/python/apache_beam/io/gcp/pubsub_test.py | 89 +++++++++++-------- .../runners/dataflow/dataflow_runner.py | 11 +-- .../apache_beam/runners/direct/direct_runner.py | 54 ++++++++++++ .../runners/direct/transform_evaluator.py | 89 +++++++++++++++++++ 6 files changed, 281 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 4c29f2b..7696d77 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -28,6 +28,8 @@ import logging import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions import apache_beam.transforms.window as window @@ -41,13 +43,17 @@ def run(argv=None): parser = argparse.ArgumentParser() parser.add_argument( '--input_topic', required=True, - help='Input PubSub topic of the form "/topics//".') + help=('Input PubSub topic of the form ' + '"projects//topics/".')) parser.add_argument( '--output_topic', required=True, - help='Output PubSub topic of the form "/topics//".') + help=('Output PubSub topic of the form ' + '"projects//topic/".')) known_args, pipeline_args = parser.parse_known_args(argv) + options = PipelineOptions(pipeline_args) + options.view_as(StandardOptions).streaming = True - with beam.Pipeline(argv=pipeline_args) as p: + with beam.Pipeline(options=options) as p: # Read from PubSub into a PCollection. lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/io/gcp/pubsub.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index fabe296..32d388a 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -24,12 +24,16 @@ This API is currently under development and is subject to change. from __future__ import absolute_import +import re + from apache_beam import coders from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms import core from apache_beam.transforms import PTransform from apache_beam.transforms import Map +from apache_beam.transforms import window from apache_beam.transforms.display import DisplayDataItem @@ -43,11 +47,12 @@ class ReadStringsFromPubSub(PTransform): """Initializes ``ReadStringsFromPubSub``. Attributes: - topic: Cloud Pub/Sub topic in the form "/topics//". If - provided then subscription must be None. + topic: Cloud Pub/Sub topic in the form "projects//topics/ + ". If provided, subscription must be None. subscription: Existing Cloud Pub/Sub subscription to use in the - form "projects//subscriptions/". If provided then - topic must be None. + form "projects//subscriptions/". If not + specified, a temporary subscription will be created from the specified + topic. If provided, topic must be None. id_label: The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for @@ -56,17 +61,14 @@ class ReadStringsFromPubSub(PTransform): case, deduplication of the stream will be strictly best effort. """ super(ReadStringsFromPubSub, self).__init__() - if topic and subscription: - raise ValueError("Only one of topic or subscription should be provided.") - - if not (topic or subscription): - raise ValueError("Either a topic or subscription must be provided.") - self._source = _PubSubPayloadSource( topic, subscription=subscription, id_label=id_label) + def get_windowing(self, unused_inputs): + return core.Windowing(window.GlobalWindows()) + def expand(self, pvalue): pcoll = pvalue.pipeline | Read(self._source) pcoll.element_type = bytes @@ -93,15 +95,45 @@ class WriteStringsToPubSub(PTransform): return pcoll | Write(self._sink) +PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]' +SUBSCRIPTION_REGEXP = 'projects/([^/]+)/subscriptions/(.+)' +TOPIC_REGEXP = 'projects/([^/]+)/topics/(.+)' + + +def parse_topic(full_topic): + match = re.match(TOPIC_REGEXP, full_topic) + if not match: + raise ValueError( + 'PubSub topic must be in the form "projects//topics' + '/" (got %r).' % full_topic) + project, topic_name = match.group(1), match.group(2) + if not re.match(PROJECT_ID_REGEXP, project): + raise ValueError('Invalid PubSub project name: %r.' % project) + return project, topic_name + + +def parse_subscription(full_subscription): + match = re.match(SUBSCRIPTION_REGEXP, full_subscription) + if not match: + raise ValueError( + 'PubSub subscription must be in the form "projects/' + '/subscriptions/" (got %r).' % full_subscription) + project, subscription_name = match.group(1), match.group(2) + if not re.match(PROJECT_ID_REGEXP, project): + raise ValueError('Invalid PubSub project name: %r.' % project) + return project, subscription_name + + class _PubSubPayloadSource(dataflow_io.NativeSource): """Source for the payload of a message as bytes from a Cloud Pub/Sub topic. Attributes: - topic: Cloud Pub/Sub topic in the form "/topics//". If - provided then topic must be None. + topic: Cloud Pub/Sub topic in the form "projects//topics/". + If provided, subscription must be None. subscription: Existing Cloud Pub/Sub subscription to use in the - form "projects//subscriptions/". If provided then - subscription must be None. + form "projects//subscriptions/". If not specified, + a temporary subscription will be created from the specified topic. If + provided, topic must be None. id_label: The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for @@ -111,13 +143,26 @@ class _PubSubPayloadSource(dataflow_io.NativeSource): """ def __init__(self, topic=None, subscription=None, id_label=None): - # we are using this coder explicitly for portability reasons of PubsubIO + # We are using this coder explicitly for portability reasons of PubsubIO # across implementations in languages. self.coder = coders.BytesCoder() - self.topic = topic - self.subscription = subscription + self.full_topic = topic + self.full_subscription = subscription + self.topic_name = None + self.subscription_name = None self.id_label = id_label + # Perform some validation on the topic and subscription. + if not (topic or subscription): + raise ValueError('Either a topic or subscription must be provided.') + if topic and subscription: + raise ValueError('Only one of topic or subscription should be provided.') + + if topic: + self.project, self.topic_name = parse_topic(topic) + if subscription: + self.project, self.subscription_name = parse_subscription(subscription) + @property def format(self): """Source format name required for remote execution.""" @@ -128,10 +173,10 @@ class _PubSubPayloadSource(dataflow_io.NativeSource): DisplayDataItem(self.id_label, label='ID Label Attribute').drop_if_none(), 'topic': - DisplayDataItem(self.topic, - label='Pubsub Topic'), + DisplayDataItem(self.full_topic, + label='Pubsub Topic').drop_if_none(), 'subscription': - DisplayDataItem(self.subscription, + DisplayDataItem(self.full_subscription, label='Pubsub Subscription').drop_if_none()} def reader(self): @@ -146,7 +191,9 @@ class _PubSubPayloadSink(dataflow_io.NativeSink): # we are using this coder explicitly for portability reasons of PubsubIO # across implementations in languages. self.coder = coders.BytesCoder() - self.topic = topic + self.full_topic = topic + + self.project, self.topic_name = parse_topic(topic) @property def format(self): @@ -154,7 +201,7 @@ class _PubSubPayloadSink(dataflow_io.NativeSink): return 'pubsub' def display_data(self): - return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')} + return {'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic')} def writer(self): raise NotImplementedError( http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/io/gcp/pubsub_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 5d3e985..0dcc3c3 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -31,89 +31,108 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher +# Protect against environments where the PubSub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + + +@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestReadStringsFromPubSub(unittest.TestCase): def test_expand_with_topic(self): p = TestPipeline() - pcoll = p | ReadStringsFromPubSub('a_topic', None, 'a_label') + pcoll = p | ReadStringsFromPubSub('projects/fakeprj/topics/a_topic', + None, 'a_label') # Ensure that the output type is str self.assertEqual(unicode, pcoll.element_type) - # Ensure that the type on the intermediate read output PCollection is bytes - read_pcoll = pcoll.producer.inputs[0] - self.assertEqual(bytes, read_pcoll.element_type) - # Ensure that the properties passed through correctly - source = read_pcoll.producer.transform.source - self.assertEqual('a_topic', source.topic) + source = pcoll.producer.transform._source + self.assertEqual('a_topic', source.topic_name) self.assertEqual('a_label', source.id_label) def test_expand_with_subscription(self): p = TestPipeline() - pcoll = p | ReadStringsFromPubSub(None, 'a_subscription', 'a_label') + pcoll = p | ReadStringsFromPubSub( + None, 'projects/fakeprj/subscriptions/a_subscription', 'a_label') # Ensure that the output type is str self.assertEqual(unicode, pcoll.element_type) - # Ensure that the type on the intermediate read output PCollection is bytes - read_pcoll = pcoll.producer.inputs[0] - self.assertEqual(bytes, read_pcoll.element_type) - # Ensure that the properties passed through correctly - source = read_pcoll.producer.transform.source - self.assertEqual('a_subscription', source.subscription) + source = pcoll.producer.transform._source + self.assertEqual('a_subscription', source.subscription_name) self.assertEqual('a_label', source.id_label) - def test_expand_with_both_topic_and_subscription(self): - with self.assertRaisesRegexp( - ValueError, "Only one of topic or subscription should be provided."): - ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label') - def test_expand_with_no_topic_or_subscription(self): with self.assertRaisesRegexp( ValueError, "Either a topic or subscription must be provided."): ReadStringsFromPubSub(None, None, 'a_label') + def test_expand_with_both_topic_and_subscription(self): + with self.assertRaisesRegexp( + ValueError, "Only one of topic or subscription should be provided."): + ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label') + +@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestWriteStringsToPubSub(unittest.TestCase): def test_expand(self): p = TestPipeline() - pdone = p | ReadStringsFromPubSub('baz') | WriteStringsToPubSub('a_topic') + pdone = (p + | ReadStringsFromPubSub('projects/fakeprj/topics/baz') + | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')) # Ensure that the properties passed through correctly - sink = pdone.producer.transform.sink - self.assertEqual('a_topic', sink.topic) - - # Ensure that the type on the intermediate payload transformer output - # PCollection is bytes - write_pcoll = pdone.producer.inputs[0] - self.assertEqual(bytes, write_pcoll.element_type) + self.assertEqual('a_topic', pdone.producer.transform.dofn.topic_name) +@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestPubSubSource(unittest.TestCase): - def test_display_data(self): - source = _PubSubPayloadSource('a_topic', 'a_subscription', 'a_label') + def test_display_data_topic(self): + source = _PubSubPayloadSource( + 'projects/fakeprj/topics/a_topic', + None, + 'a_label') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher( + 'topic', 'projects/fakeprj/topics/a_topic'), + DisplayDataItemMatcher('id_label', 'a_label')] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_display_data_subscription(self): + source = _PubSubPayloadSource( + None, + 'projects/fakeprj/subscriptions/a_subscription', + 'a_label') dd = DisplayData.create_from(source) expected_items = [ - DisplayDataItemMatcher('topic', 'a_topic'), - DisplayDataItemMatcher('subscription', 'a_subscription'), + DisplayDataItemMatcher( + 'subscription', 'projects/fakeprj/subscriptions/a_subscription'), DisplayDataItemMatcher('id_label', 'a_label')] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_display_data_no_subscription(self): - source = _PubSubPayloadSource('a_topic') + source = _PubSubPayloadSource('projects/fakeprj/topics/a_topic') dd = DisplayData.create_from(source) expected_items = [ - DisplayDataItemMatcher('topic', 'a_topic')] + DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) +@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class TestPubSubSink(unittest.TestCase): def test_display_data(self): - sink = _PubSubPayloadSink('a_topic') + sink = _PubSubPayloadSink('projects/fakeprj/topics/a_topic') dd = DisplayData.create_from(sink) expected_items = [ - DisplayDataItemMatcher('topic', 'a_topic')] + DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f213b3b..57bcc5e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -668,11 +668,12 @@ class DataflowRunner(PipelineRunner): raise ValueError('PubSubPayloadSource is currently available for use ' 'only in streaming pipelines.') # Only one of topic or subscription should be set. - if transform.source.topic: - step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic) - elif transform.source.subscription: + if transform.source.full_subscription: step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, - transform.source.subscription) + transform.source.full_subscription) + elif transform.source.full_topic: + step.add_property(PropertyNames.PUBSUB_TOPIC, + transform.source.full_topic) if transform.source.id_label: step.add_property(PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label) @@ -756,7 +757,7 @@ class DataflowRunner(PipelineRunner): if not standard_options.streaming: raise ValueError('PubSubPayloadSink is currently available for use ' 'only in streaming pipelines.') - step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic) + step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic) else: raise ValueError( 'Sink %r has unexpected format %s.' % ( http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 2a75977..1a94b3d 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -26,8 +26,10 @@ from __future__ import absolute_import import collections import logging +import apache_beam as beam from apache_beam import typehints from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.pvalue import PCollection from apache_beam.runners.direct.bundle_factory import BundleFactory from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner @@ -107,6 +109,58 @@ class DirectRunner(PipelineRunner): .with_output_types(*type_hints.output_types[0])) return transform.expand(pcoll) + def apply_ReadStringsFromPubSub(self, transform, pcoll): + try: + from google.cloud import pubsub as unused_pubsub + except ImportError: + raise ImportError('Google Cloud PubSub not available, please install ' + 'apache_beam[gcp]') + # Execute this as a native transform. + output = PCollection(pcoll.pipeline) + output.element_type = unicode + return output + + def apply_WriteStringsToPubSub(self, transform, pcoll): + try: + from google.cloud import pubsub + except ImportError: + raise ImportError('Google Cloud PubSub not available, please install ' + 'apache_beam[gcp]') + project = transform._sink.project + topic_name = transform._sink.topic_name + + class DirectWriteToPubSub(beam.DoFn): + _topic = None + + def __init__(self, project, topic_name): + self.project = project + self.topic_name = topic_name + + def start_bundle(self): + if self._topic is None: + self._topic = pubsub.Client(project=self.project).topic( + self.topic_name) + self._buffer = [] + + def process(self, elem): + self._buffer.append(elem.encode('utf-8')) + if len(self._buffer) >= 100: + self._flush() + + def finish_bundle(self): + self._flush() + + def _flush(self): + if self._buffer: + with self._topic.batch() as batch: + for datum in self._buffer: + batch.publish(datum) + self._buffer = [] + + output = pcoll | beam.ParDo(DirectWriteToPubSub(project, topic_name)) + output.element_type = unicode + return output + def run(self, pipeline): """Execute the entire pipeline and returns an DirectPipelineResult.""" http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 67b2492..641291d 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -20,6 +20,8 @@ from __future__ import absolute_import import collections +import random +import time from apache_beam import coders from apache_beam import pvalue @@ -48,6 +50,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn from apache_beam.utils import counters +from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.options.pipeline_options import TypeOptions @@ -63,6 +66,7 @@ class TransformEvaluatorRegistry(object): self._evaluation_context = evaluation_context self._evaluators = { io.Read: _BoundedReadEvaluator, + io.ReadStringsFromPubSub: _PubSubReadEvaluator, core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, @@ -357,6 +361,91 @@ class _TestStreamEvaluator(_TransformEvaluator): {None: hold}) +class _PubSubSubscriptionWrapper(object): + """Wrapper for garbage-collecting temporary PubSub subscriptions.""" + + def __init__(self, subscription, should_cleanup): + self.subscription = subscription + self.should_cleanup = should_cleanup + + def __del__(self): + if self.should_cleanup: + self.subscription.delete() + + +class _PubSubReadEvaluator(_TransformEvaluator): + """TransformEvaluator for PubSub read.""" + + _subscription_cache = {} + + def __init__(self, evaluation_context, applied_ptransform, + input_committed_bundle, side_inputs, scoped_metrics_container): + assert not side_inputs + super(_PubSubReadEvaluator, self).__init__( + evaluation_context, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container) + + source = self._applied_ptransform.transform._source + self._subscription = _PubSubReadEvaluator.get_subscription( + self._applied_ptransform, source.project, source.topic_name, + source.subscription_name) + + @classmethod + def get_subscription(cls, transform, project, topic, subscription_name): + if transform not in cls._subscription_cache: + from google.cloud import pubsub + should_create = not subscription_name + if should_create: + subscription_name = 'beam_%d_%x' % ( + int(time.time()), random.randrange(1 << 32)) + cls._subscription_cache[transform] = _PubSubSubscriptionWrapper( + pubsub.Client(project=project).topic(topic).subscription( + subscription_name), + should_create) + if should_create: + cls._subscription_cache[transform].subscription.create() + return cls._subscription_cache[transform].subscription + + def start_bundle(self): + pass + + def process_element(self, element): + pass + + def _read_from_pubsub(self): + from google.cloud import pubsub + # Because of the AutoAck, we are not able to reread messages if this + # evaluator fails with an exception before emitting a bundle. However, + # the DirectRunner currently doesn't retry work items anyway, so the + # pipeline would enter an inconsistent state on any error. + with pubsub.subscription.AutoAck( + self._subscription, return_immediately=True, + max_messages=10) as results: + return [message.data for unused_ack_id, message in results.items()] + + def finish_bundle(self): + data = self._read_from_pubsub() + if data: + output_pcollection = list(self._outputs)[0] + bundle = self._evaluation_context.create_bundle(output_pcollection) + # TODO(ccy): we currently do not use the PubSub message timestamp or + # respect the PubSub source's id_label field. + now = Timestamp.of(time.time()) + for message_data in data: + bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now)) + bundles = [bundle] + else: + bundles = [] + input_pvalue = self._applied_ptransform.inputs + if not input_pvalue: + input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline) + unprocessed_bundle = self._evaluation_context.create_bundle( + input_pvalue) + return TransformResult( + self._applied_ptransform, bundles, + [unprocessed_bundle], None, {None: Timestamp.of(time.time())}) + + class _FlattenEvaluator(_TransformEvaluator): """TransformEvaluator for Flatten transform."""