airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] potiuk commented on a change in pull request #6773: [AIRFLOW-6038] AWS DataSync example_dags added
Date Wed, 11 Dec 2019 12:24:04 GMT
potiuk commented on a change in pull request #6773: [AIRFLOW-6038] AWS DataSync example_dags
added
URL: https://github.com/apache/airflow/pull/6773#discussion_r356563432
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -124,379 +184,192 @@ def get_hook(self):
         """
         if not self.hook:
             self.hook = AWSDataSyncHook(
-                aws_conn_id=self.aws_conn_id
+                aws_conn_id=self.aws_conn_id,
+                wait_interval_seconds=self.wait_interval_seconds,
             )
         return self.hook
 
-    def _create_or_get_location_arn(
-            self,
-            location_uri,
-            **create_location_kwargs):
-        location_arns = self.get_hook().get_location_arns(
-            location_uri,
-            self.case_sensitive_location_search)
-        if not location_arns:
-            self.log.info('Creating location for LocationUri %s',
-                          location_uri)
-            location_arn = self.get_hook().create_location(
-                location_uri,
-                **create_location_kwargs)
-            self.log.info(
-                'Created a Location with LocationArn %s', location_arn)
-            return location_arn
-        else:
-            self.log.info('Found LocationArns %s for LocationUri %s',
-                          location_arns, location_uri)
-            if len(location_arns) != 1:
-                raise AirflowException(
-                    'More than 1 LocationArn was found for LocationUri %s' % location_uri)
-            return location_arns[0]
-
     def execute(self, context):
-        """Create a new Task (and Locations) if necessary."""
-        hook = self.get_hook()
-
-        self.source_location_arn = self._create_or_get_location_arn(
-            self.source_location_uri,
-            **self.create_source_location_kwargs
-        )
-
-        self.destination_location_arn = self._create_or_get_location_arn(
-            self.destination_location_uri,
-            **self.create_destination_location_kwargs
-        )
-
-        self.log.info('Creating a Task.')
-        self.task_arn = hook.create_task(
-            self.source_location_arn,
-            self.destination_location_arn,
-            **self.create_task_kwargs
-        )
+        # If task_arn was not specified then try to
+        # find 0, 1 or many candidate DataSync Tasks to run
         if not self.task_arn:
-            raise AirflowException('Task could not be created')
-        self.log.info('Created a Task with TaskArn %s', self.task_arn)
-        return self.task_arn
+            self._get_tasks_and_locations()
 
+        # If some were found, identify which one to run
+        if self.candidate_task_arns:
+            self.task_arn = self._choose_task_from_list(
+                self.candidate_task_arns)
 
-class AWSDataSyncGetTasksOperator(BaseOperator):
-    r"""Get AWS DataSync Tasks.
+        # If we couldnt find one then try create one
+        if not self.task_arn and self.create_task_kwargs:
+            self._create_datasync_task()
 
-    Finds AWS DataSync Tasks which have a source and destination
-    location corresponding to the specified source and destination
-    URIs.
-
-    If ``do_xcom_push`` is True, the TaskArns which are found
-    will be pushed to an XCom.
-
-    note:: There may be 0, 1, or many matching Tasks. The calling
-        application will need to deal with these scenarios.
-
-    :param str aws_conn_id: AWS connection to use.
-    :param str source_location_uri: Source location URI.
-        Example: ``smb://server/subdir``
-    :param str destination_location_uri: Destination location URI.
-        Example: ``s3://airflow_bucket/stuff``
-    :param bool case_sensitive_location_search: Whether or not to do a
-        case-sensitive search for each Location URI.
-
-    :raises AirflowException: If neither ``source_location_uri`` nor
-        ``destination_location_uri`` were specified.
-    """
-    template_fields = ('source_location_uri',
-                       'destination_location_uri')
-    ui_color = '#44b5e2'
-
-    @apply_defaults
-    def __init__(
-        self,
-        aws_conn_id='aws_default',
-        source_location_uri=None,
-        destination_location_uri=None,
-        case_sensitive_location_search=True,
-        *args,
-        **kwargs
-    ):
-        super().__init__(*args, **kwargs)
+        if not self.task_arn:
+            raise AirflowException(
+                "DataSync TaskArn could be identified or created.")
 
-        # Assignments
-        self.aws_conn_id = aws_conn_id
-        self.source_location_uri = source_location_uri
-        self.destination_location_uri = destination_location_uri
-        self.case_sensitive_location_search = case_sensitive_location_search
+        self.log.info("Using DataSync TaskArn %s", self.task_arn)
 
-        # Validations
-        if not (self.source_location_uri and self.destination_location_uri):
-            raise AirflowException(
-                'Specify both source_location_uri and destination_location_uri')
+        # Update the DataSync Task
+        if self.update_task_kwargs:
+            self._update_datasync_task()
 
-        # Others
-        self.hook = None
-        self.source_location_arns = None
-        self.destination_location_arns = None
-        self.task_arns = None
+        # Execute the DataSync Task
+        self._execute_datasync_task()
 
-    def get_hook(self):
-        """Create and return AWSDataSyncHook.
+        if not self.task_execution_arn:
+            raise AirflowException("Nothing was executed")
 
-        :return AWSDataSyncHook: An AWSDataSyncHook instance.
-        """
-        if not self.hook:
-            self.hook = AWSDataSyncHook(
-                aws_conn_id=self.aws_conn_id
-            )
-        return self.hook
+        # Delete the DataSyncTask
+        if self.delete_task_after_execution:
+            self._delete_datasync_task()
 
-    def _get_location_arns(
-            self, location_uri
-    ):
-        location_arns = self.get_hook().get_location_arns(
-            location_uri,
-            self.case_sensitive_location_search)
-        self.log.info('Found LocationArns %s for LocationUri %s',
-                      location_arns, location_uri)
-        return location_arns
+        return {"TaskArn": self.task_arn, "TaskExecutionArn": self.task_execution_arn}
 
-    def execute(self, context):
-        """Create a new Task (and Locations) if necessary."""
+    def _get_tasks_and_locations(self):
+        """Find existing DataSync Task based on source and dest Locations."""
         hook = self.get_hook()
 
-        self.source_location_arns = self._get_location_arns(
+        self.candidate_source_location_arns = self._get_location_arns(
             self.source_location_uri
         )
 
-        self.destination_location_arns = self._get_location_arns(
+        self.candidate_destination_location_arns = self._get_location_arns(
             self.destination_location_uri
         )
 
-        if not (self.source_location_arns and self.destination_location_arns):
-            self.log.info('Insufficient Locations to search for Task')
-            return []
-
-        self.log.info('Searching for TaskArns')
-        self.task_arns = hook.get_task_arns_for_location_arns(
-            self.source_location_arns,
-            self.destination_location_arns)
-        self.log.info('Found %s matching TaskArns', len(self.task_arns))
-        return self.task_arns
+        if not self.candidate_source_location_arns:
+            self.log.info("No matching source Locations")
+            return
 
+        if not self.candidate_destination_location_arns:
+            self.log.info("No matching destination Locations")
+            return
 
-class AWSDataSyncUpdateTaskOperator(BaseOperator):
-    """
-    Update an AWS DataSyncTask
-
-    If ``do_xcom_push`` is True, the TaskArns which were updated
-    will be pushed to an XCom.
-
-    :param str aws_conn_id: AWS connection to use.
-    :param str task_arn: The TaskArn to update. If ``None``, the operator will
-        look in xcom_pull for a TaskArn.
-    :param dict update_task_kwargs: The TaskArn will be updated with ``update_task_kwargs``.
-        ``update_task_kwargs`` is used internally like this:
-        ``boto3.update_task(TaskArn=task_arn, **update_task_kwargs)``
-        Example:  ``{'Name': 'xyz', 'Options': ..., 'Excludes': ...}``
-    :raises AirflowException: If ``task_arn`` is None.
-    :raises AirflowException: If ``update_task_kwargs`` is None.
-    """
-    template_fields = ('task_arn',)
-    ui_color = '#44b5e2'
-
-    @apply_defaults
-    def __init__(
-        self,
-        aws_conn_id='aws_default',
-        task_arn=None,
-        update_task_kwargs=None,
-        *args,
-        **kwargs
-    ):
-        super().__init__(*args, **kwargs)
-
-        # Assignments
-        self.aws_conn_id = aws_conn_id
-        self.task_arn = task_arn
-        self.update_task_kwargs = update_task_kwargs
+        self.log.info("Finding DataSync TaskArns that have these LocationArns")
+        self.candidate_task_arns = hook.get_task_arns_for_location_arns(
+            self.candidate_source_location_arns,
+            self.candidate_destination_location_arns,
+        )
+        self.log.info("Found candidate DataSync TaskArns %s",
+                      self.candidate_task_arns)
+
+    def _choose_task_from_list(self, task_arn_list):
+        """Select 1 DataSync TaskArn from a list"""
+        if self.choose_task_callable:
+            return self.choose_task_callable(task_arn_list)
+        if not task_arn_list:
+            return None
+        if len(task_arn_list) == 1:
+            return task_arn_list[0]
+        raise AirflowException("Too many DataSync Tasks to choose from.")
+
+    def _choose_location_from_list(self, location_arn_list):
+        """Select 1 DataSync LocationArn from a list"""
+        if self.choose_location_callable:
+            return self.choose_location_callable(location_arn_list)
+        if not location_arn_list:
+            return None
+        if len(location_arn_list) == 1:
+            return location_arn_list[0]
 
 Review comment:
   Same here - I think callable should be only called when there is a doubt.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message