airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python
Date Mon, 16 Sep 2019 12:34:03 GMT
TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook
to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r324647369
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -177,40 +293,88 @@ def create_subscription(
         :param fail_if_exists: if set, raise an exception if the topic
             already exists
         :type fail_if_exists: bool
+        :param push_config: If push delivery is used with this subscription,
+            this field is used to configure it. An empty ``pushConfig`` signifies
+            that the subscriber will pull and ack messages using API methods.
+        :type push_config: Union[Dict, google.cloud.pubsub_v1.types.PushConfig]
+        :param retain_acked_messages: Indicates whether to retain acknowledged
+            messages. If true, then messages are not expunged from the subscription's
+            backlog, even if they are acknowledged, until they fall out of the
+            ``message_retention_duration`` window. This must be true if you would
+            like to Seek to a timestamp.
+        :type retain_acked_messages: bool
+        :param message_retention_duration: How long to retain unacknowledged messages
+            in the subscription's backlog, from the moment a message is published. If
+            ``retain_acked_messages`` is true, then this also configures the
+            retention of acknowledged messages, and thus configures how far back in
+            time a ``Seek`` can be done. Defaults to 7 days. Cannot be more than 7
+            days or less than 10 minutes.
+        :type message_retention_duration: Union[Dict, google.cloud.pubsub_v1.types.Duration]
+        :param labels: Client-assigned labels; see
+            https://cloud.google.com/pubsub/docs/labels
+        :type labels: Dict[str, str]
+        :param retry: (Optional) A retry object used to retry requests.
+            If None is specified, requests will not be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: (Optional) The amount of time, in seconds, to wait for the request
+            to complete. Note that if retry is specified, the timeout applies to each
+            individual attempt.
+        :type timeout: float
+        :param metadata: (Optional) Additional metadata that is provided to the method.
+        :type metadata: Sequence[Tuple[str, str]]]
         :return: subscription name which will be the system-generated value if
             the ``subscription`` parameter is not supplied
         :rtype: str
         """
-        service = self.get_conn()
-        full_topic = _format_topic(topic_project, topic)
+        subscriber = self.subscriber_client
+
         if not subscription:
             subscription = 'sub-{}'.format(uuid4())
         if not subscription_project:
             subscription_project = topic_project
-        full_subscription = _format_subscription(subscription_project,
-                                                 subscription)
-        body = {
-            'topic': full_topic,
-            'ackDeadlineSeconds': ack_deadline_secs
-        }
+
+        # Add airflow-version label to the subscription
+        labels = labels or {}
+        labels['airflow-version'] = 'v' + version.replace('.', '-').replace('+', '-')
+
+        # pylint: disable=no-member
+        subscription_path = SubscriberClient.subscription_path(subscription_project, subscription)
+        topic_path = SubscriberClient.topic_path(topic_project, topic)
+
+        self.log.info("Creating subscription (path) %s for topic (path) %a", subscription_path,
topic_path)
         try:
-            service.projects().subscriptions().create(  # pylint: disable=no-member
-                name=full_subscription, body=body).execute(num_retries=self.num_retries)
-        except HttpError as e:
-            # Status code 409 indicates that the subscription already exists.
-            if str(e.resp['status']) == '409':
-                message = 'Subscription already exists: {}'.format(
-                    full_subscription)
-                self.log.warning(message)
-                if fail_if_exists:
-                    raise PubSubException(message)
-            else:
-                raise PubSubException(
-                    'Error creating subscription {}'.format(full_subscription),
-                    e)
+            subscriber.create_subscription(
+                name=subscription_path,
+                topic=topic_path,
+                push_config=push_config,
+                ack_deadline_seconds=ack_deadline_secs,
+                retain_acked_messages=retain_acked_messages,
+                message_retention_duration=message_retention_duration,
+                labels=labels,
+                retry=retry,
+                timeout=timeout,
+                metadata=metadata,
+            )
+        except AlreadyExists:
+            message = 'Subscription already exists: {}'.format(subscription_path)
+            self.log.warning(message)
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message