beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c..@apache.org
Subject [beam] branch master updated: Don't map transforms to pubsub subscriptions unless neccessary
Date Thu, 01 Aug 2019 16:48:23 GMT
This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a4a4f0  Don't map transforms to pubsub subscriptions unless neccessary
     new 0226799  Merge pull request #9146 from ostrokach/bugfix/pubsub_reader_global_scope
1a4a4f0 is described below

commit 1a4a4f0543273608e0469ca648945d772783a338
Author: Alexey Strokach <strokach@google.com>
AuthorDate: Tue Jul 23 16:55:03 2019 -0700

    Don't map transforms to pubsub subscriptions unless neccessary
---
 .../runners/direct/transform_evaluator.py          | 62 ++++++++--------------
 1 file changed, 21 insertions(+), 41 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 29e3a48..8fa9b2d 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -19,6 +19,7 @@
 
 from __future__ import absolute_import
 
+import atexit
 import collections
 import logging
 import random
@@ -376,45 +377,11 @@ class _TestStreamEvaluator(_TransformEvaluator):
         self, self.bundles, unprocessed_bundles, None, {None: hold})
 
 
-class _PubSubSubscriptionWrapper(object):
-  """Wrapper for managing temporary PubSub subscriptions."""
-
-  def __init__(self, project, short_topic_name, short_sub_name):
-    """Initialize subscription wrapper.
-
-    If sub_name is None, will create a temporary subscription to topic_name.
-
-    Args:
-      project: GCP project name for topic and subscription. May be None.
-        Required if sub_name is None.
-      short_topic_name: Valid topic name without
-        'projects/{project}/topics/' prefix. May be None.
-        Required if sub_name is None.
-      short_sub_name: Valid subscription name without
-        'projects/{project}/subscriptions/' prefix. May be None.
-    """
-    from google.cloud import pubsub
-    self.sub_client = pubsub.SubscriberClient()
-
-    if short_sub_name is None:
-      self.sub_name = self.sub_client.subscription_path(
-          project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
-      topic_name = self.sub_client.topic_path(project, short_topic_name)
-      self.sub_client.create_subscription(self.sub_name, topic_name)
-      self._should_cleanup = True
-    else:
-      self.sub_name = self.sub_client.subscription_path(project, short_sub_name)
-      self._should_cleanup = False
-
-  def __del__(self):
-    if self._should_cleanup:
-      self.sub_client.delete_subscription(self.sub_name)
-
-
 class _PubSubReadEvaluator(_TransformEvaluator):
   """TransformEvaluator for PubSub read."""
 
   # A mapping of transform to _PubSubSubscriptionWrapper.
+  # TODO(BEAM-7750): Prevents garbage collection of pipeline instances.
   _subscription_cache = {}
 
   def __init__(self, evaluation_context, applied_ptransform,
@@ -428,16 +395,29 @@ class _PubSubReadEvaluator(_TransformEvaluator):
     if self.source.id_label:
       raise NotImplementedError(
           'DirectRunner: id_label is not supported for PubSub reads')
-    self._sub_name = _PubSubReadEvaluator.get_subscription(
+    self._sub_name = self.get_subscription(
         self._applied_ptransform, self.source.project, self.source.topic_name,
         self.source.subscription_name)
 
   @classmethod
-  def get_subscription(cls, transform, project, topic, short_sub_name):
-    if transform not in cls._subscription_cache:
-      wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name)
-      cls._subscription_cache[transform] = wrapper
-    return cls._subscription_cache[transform].sub_name
+  def get_subscription(cls, transform, project, short_topic_name,
+                       short_sub_name):
+    from google.cloud import pubsub
+
+    if short_sub_name:
+      return pubsub.SubscriberClient.subscription_path(project, short_sub_name)
+
+    if transform in cls._subscription_cache:
+      return cls._subscription_cache[transform]
+
+    sub_client = pubsub.SubscriberClient()
+    sub_name = sub_client.subscription_path(
+        project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+    topic_name = sub_client.topic_path(project, short_topic_name)
+    sub_client.create_subscription(sub_name, topic_name)
+    atexit.register(sub_client.delete_subscription, sub_name)
+    cls._subscription_cache[transform] = sub_name
+    return cls._subscription_cache[transform]
 
   def start_bundle(self):
     pass


Mime
View raw message