airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python
Date Mon, 30 Sep 2019 12:14:30 GMT
TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite Google PubSub Hook
to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r329543683
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -52,254 +51,483 @@ class PubSubHook(GoogleCloudBaseHook):
 
     def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: Optional[str]
= None) -> None:
         super().__init__(gcp_conn_id, delegate_to=delegate_to)
+        self._client = None
+
+    def get_conn(self) -> PublisherClient:
+        """
+        Retrieves connection to Google Cloud Pub/Sub.
 
-    def get_conn(self) -> Any:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.PublisherClient
         """
-        Returns a Pub/Sub service object.
+        if not self._client:
+            self._client = PublisherClient(
+                credentials=self._get_credentials(),
+                client_info=self.client_info
+            )
+        return self._client
 
-        :rtype: googleapiclient.discovery.Resource
+    @cached_property
+    def subscriber_client(self) -> SubscriberClient:
         """
-        http_authorized = self._authorize()
-        return build(
-            'pubsub', 'v1', http=http_authorized, cache_discovery=False)
+        Creates SubscriberClient.
 
-    def publish(self, project: str, topic: str, messages: List[Dict]) -> None:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.SubscriberClient
+        """
+        return SubscriberClient(
+            credentials=self._get_credentials(),
+            client_info=self.client_info
+        )
+
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    def publish(
+        self,
+        topic: str,
+        messages: List[Dict],
+        project_id: Optional[str] = None,
+    ) -> None:
         """
         Publishes messages to a Pub/Sub topic.
 
-        :param project: the GCP project ID in which to publish
-        :type project: str
         :param topic: the Pub/Sub topic to which to publish; do not
             include the ``projects/{project}/topics/`` prefix.
         :type topic: str
         :param messages: messages to publish; if the data field in a
             message is set, it should already be base64 encoded.
         :type messages: list of PubSub messages; see
             http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+        :param project_id: Optional, the GCP project ID in which to publish.
+            If set to None or missing, the default project_id from the GCP connection is
used.
+        :type project_id: str
         """
-        body = {'messages': messages}
-        full_topic = _format_topic(project, topic)
-        request = self.get_conn().projects().topics().publish(  # pylint: disable=no-member
-            topic=full_topic, body=body)
+        assert project_id is not None
+        publisher = self.get_conn()
+        topic_path = PublisherClient.topic_path(project_id, topic)  # pylint: disable=no-member
+
+        # TODO validation of messages
 
 Review comment:
   In the last fixup I've included validation and deprecation warning.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message