From commits-return-80364-archive-asf-public=cust-asf.ponee.io@airflow.apache.org Tue Dec 10 20:51:10 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0DD65180630 for ; Tue, 10 Dec 2019 21:51:09 +0100 (CET) Received: (qmail 83466 invoked by uid 500); 10 Dec 2019 20:51:09 -0000 Mailing-List: contact commits-help@airflow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.apache.org Delivered-To: mailing list commits@airflow.apache.org Received: (qmail 83456 invoked by uid 99); 10 Dec 2019 20:51:09 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Dec 2019 20:51:09 +0000 From: GitBox To: commits@airflow.apache.org Subject: [GitHub] [airflow] mik-laj commented on a change in pull request #6764: [WIP][AIRFLOW-6206] Use "aws_" prefix for aws_batch_operator.py module [AIP-21] Message-ID: <157601106919.4601.15762306114886038283.gitbox@gitbox.apache.org> Date: Tue, 10 Dec 2019 20:51:09 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit mik-laj commented on a change in pull request #6764: [WIP][AIRFLOW-6206] Use "aws_" prefix for aws_batch_operator.py module [AIP-21] URL: https://github.com/apache/airflow/pull/6764#discussion_r356270296 ########## File path: airflow/contrib/operators/awsbatch_operator.py ########## @@ -17,199 +17,17 @@ # specific language governing permissions and limitations # under the License. # -import sys -from math import pow -from random import randint -from time import sleep -from typing import Optional -from airflow.contrib.hooks.aws_hook import AwsHook -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.typing_compat import Protocol -from airflow.utils.decorators import apply_defaults +"""This module is deprecated. Please use `airflow.providers.amazon.aws.operators.batch`.""" +import warnings -class BatchProtocol(Protocol): - def submit_job(self, jobName, jobQueue, jobDefinition, containerOverrides): - ... +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.batch`.", + DeprecationWarning, + stacklevel=2, +) - def get_waiter(self, x: str): - ... - - def describe_jobs(self, jobs): - ... - - def terminate_job(self, jobId: str, reason: str): - ... - - -class AWSBatchOperator(BaseOperator): - """ - Execute a job on AWS Batch Service - - .. warning: the queue parameter was renamed to job_queue to segregate the - internal CeleryExecutor queue from the AWS Batch internal queue. - - :param job_name: the name for the job that will run on AWS Batch (templated) - :type job_name: str - :param job_definition: the job definition name on AWS Batch - :type job_definition: str - :param job_queue: the queue name on AWS Batch - :type job_queue: str - :param overrides: the same parameter that boto3 will receive on - containerOverrides (templated) - http://boto3.readthedocs.io/en/latest/reference/services/batch.html#Batch.Client.submit_job - :type overrides: dict - :param array_properties: the same parameter that boto3 will receive on - arrayProperties - http://boto3.readthedocs.io/en/latest/reference/services/batch.html#Batch.Client.submit_job - :type array_properties: dict - :param parameters: the same parameter that boto3 will receive on - parameters (templated) - http://boto3.readthedocs.io/en/latest/reference/services/batch.html#Batch.Client.submit_job - :type parameters: dict - :param max_retries: exponential backoff retries while waiter is not - merged, 4200 = 48 hours - :type max_retries: int - :param aws_conn_id: connection id of AWS credentials / region name. If None, - credential boto3 strategy will be used - (http://boto3.readthedocs.io/en/latest/guide/configuration.html). - :type aws_conn_id: str - :param region_name: region name to use in AWS Hook. - Override the region_name in connection (if provided) - :type region_name: str - """ - - ui_color = '#c3dae0' - client = None # type: Optional[BatchProtocol] - arn = None # type: Optional[str] - template_fields = ('job_name', 'overrides', 'parameters',) - - @apply_defaults - def __init__(self, job_name, job_definition, job_queue, overrides, array_properties=None, - parameters=None, max_retries=4200, aws_conn_id=None, region_name=None, **kwargs): - super().__init__(**kwargs) - - self.job_name = job_name - self.aws_conn_id = aws_conn_id - self.region_name = region_name - self.job_definition = job_definition - self.job_queue = job_queue - self.overrides = overrides - self.array_properties = array_properties or {} - self.parameters = parameters - self.max_retries = max_retries - - self.jobId = None # pylint: disable=invalid-name - self.jobName = None # pylint: disable=invalid-name - - self.hook = self.get_hook() - - def execute(self, context): - self.log.info( - 'Running AWS Batch Job - Job definition: %s - on queue %s', - self.job_definition, self.job_queue - ) - self.log.info('AWSBatchOperator overrides: %s', self.overrides) - - self.client = self.hook.get_client_type( - 'batch', - region_name=self.region_name - ) - - try: - response = self.client.submit_job( - jobName=self.job_name, - jobQueue=self.job_queue, - jobDefinition=self.job_definition, - arrayProperties=self.array_properties, - parameters=self.parameters, - containerOverrides=self.overrides) - - self.log.info('AWS Batch Job started: %s', response) - - self.jobId = response['jobId'] - self.jobName = response['jobName'] - - self._wait_for_task_ended() - - self._check_success_task() - - self.log.info('AWS Batch Job has been successfully executed: %s', response) - except Exception as e: - self.log.info('AWS Batch Job has failed executed') - raise AirflowException(e) - - def _wait_for_task_ended(self): - """ - Try to use a waiter from the below pull request - - * https://github.com/boto/botocore/pull/1307 - - If the waiter is not available apply a exponential backoff - - * docs.aws.amazon.com/general/latest/gr/api-retries.html - """ - try: - waiter = self.client.get_waiter('job_execution_complete') - waiter.config.max_attempts = sys.maxsize # timeout is managed by airflow - waiter.wait(jobs=[self.jobId]) - except ValueError: - # If waiter not available use expo - - # Allow a batch job some time to spin up. A random interval - # decreases the chances of exceeding an AWS API throttle - # limit when there are many concurrent tasks. - pause = randint(5, 30) - - retries = 1 - while retries <= self.max_retries: - self.log.info('AWS Batch job (%s) status check (%d of %d) in the next %.2f seconds', - self.jobId, retries, self.max_retries, pause) - sleep(pause) - - response = self.client.describe_jobs(jobs=[self.jobId]) - status = response['jobs'][-1]['status'] - self.log.info('AWS Batch job (%s) status: %s', self.jobId, status) - if status in ['SUCCEEDED', 'FAILED']: - break - - retries += 1 - pause = 1 + pow(retries * 0.3, 2) - - def _check_success_task(self): - response = self.client.describe_jobs( - jobs=[self.jobId], - ) - - self.log.info('AWS Batch stopped, check status: %s', response) - if len(response.get('jobs')) < 1: - raise AirflowException('No job found for {}'.format(response)) - - for job in response['jobs']: - job_status = job['status'] - if job_status == 'FAILED': - reason = job['statusReason'] - raise AirflowException('Job failed with status {}'.format(reason)) - elif job_status in [ - 'SUBMITTED', - 'PENDING', - 'RUNNABLE', - 'STARTING', - 'RUNNING' - ]: - raise AirflowException( - 'This task is still pending {}'.format(job_status)) - - def get_hook(self): - return AwsHook( - aws_conn_id=self.aws_conn_id - ) - - def on_kill(self): - response = self.client.terminate_job( - jobId=self.jobId, - reason='Task killed by the user') - - self.log.info(response) +# pylint: disable=unused-import +from airflow.providers.amazon.aws.operators.batch import BatchProtocol # noqa +from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator as AWSBatchOperator # noqa Review comment: If you are changing the class name, you should add a deprecation message in init. Example: https://github.com/apache/airflow/pull/6771/files ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services