airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] feluelle commented on a change in pull request #6811: [AIRFLOW-6245] Add custom waiters for AWS batch jobs
Date Mon, 06 Jan 2020 15:54:07 GMT
feluelle commented on a change in pull request #6811: [AIRFLOW-6245] Add custom waiters for
AWS batch jobs
URL: https://github.com/apache/airflow/pull/6811#discussion_r363352545
 
 

 ##########
 File path: tests/providers/amazon/aws/hooks/test_batch_waiters.py
 ##########
 @@ -0,0 +1,486 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# pylint: disable=do-not-use-asserts, missing-docstring, redefined-outer-name
+
+
+"""
+Test AwsBatchWaiters
+
+This test suite uses a large suite of moto mocks for the
+AWS batch infrastructure.  These infrastructure mocks are
+derived from the moto test suite for testing the batch client.
+
+.. seealso::
+
+    - https://github.com/spulec/moto/pull/1197/files
+    - https://github.com/spulec/moto/blob/master/tests/test_batch/test_batch.py
+"""
+
+import inspect
+import unittest
+from typing import NamedTuple, Optional
+
+import boto3
+import botocore.client
+import botocore.exceptions
+import botocore.waiter
+import mock
+import pytest
+from moto import mock_batch, mock_ec2, mock_ecs, mock_iam, mock_logs
+
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.batch_waiters import AwsBatchWaiters
+
+# Use dummy AWS credentials
+AWS_REGION = "eu-west-1"
+AWS_ACCESS_KEY_ID = "airflow_dummy_key"
+AWS_SECRET_ACCESS_KEY = "airflow_dummy_secret"
+
+
+@pytest.fixture(scope="module")
+def aws_region():
+    return AWS_REGION
+
+
+@pytest.fixture(scope="module")
+def job_queue_name():
+    return "moto_test_job_queue"
+
+
+@pytest.fixture(scope="module")
+def job_definition_name():
+    return "moto_test_job_definition"
+
+
+#
+# AWS Clients
+#
+
+
+class AwsClients(NamedTuple):
+    batch: "botocore.client.Batch"
+    ec2: "botocore.client.EC2"
+    ecs: "botocore.client.ECS"
+    iam: "botocore.client.IAM"
+    log: "botocore.client.CloudWatchLogs"
+
+
+@pytest.yield_fixture(scope="module")
+def batch_client(aws_region):
+    with mock_batch():
+        yield boto3.client("batch", region_name=aws_region)
+
+
+@pytest.yield_fixture(scope="module")
+def ec2_client(aws_region):
+    with mock_ec2():
+        yield boto3.client("ec2", region_name=aws_region)
+
+
+@pytest.yield_fixture(scope="module")
+def ecs_client(aws_region):
+    with mock_ecs():
+        yield boto3.client("ecs", region_name=aws_region)
+
+
+@pytest.yield_fixture(scope="module")
+def iam_client(aws_region):
+    with mock_iam():
+        yield boto3.client("iam", region_name=aws_region)
+
+
+@pytest.yield_fixture(scope="module")
+def logs_client(aws_region):
+    with mock_logs():
+        yield boto3.client("logs", region_name=aws_region)
+
+
+@pytest.fixture(scope="module")
+def aws_clients(batch_client, ec2_client, ecs_client, iam_client, logs_client):
+    return AwsClients(
+        batch=batch_client, ec2=ec2_client, ecs=ecs_client, iam=iam_client, log=logs_client
+    )
+
+
+#
+# Batch Infrastructure
+#
+
+
+class Infrastructure:
+    aws_region: str
+    aws_clients: AwsClients
+    vpc_id: Optional[str] = None
+    subnet_id: Optional[str] = None
+    security_group_id: Optional[str] = None
+    iam_arn: Optional[str] = None
+    compute_env_name: Optional[str] = None
+    compute_env_arn: Optional[str] = None
+    job_queue_name: Optional[str] = None
+    job_queue_arn: Optional[str] = None
+    job_definition_name: Optional[str] = None
+    job_definition_arn: Optional[str] = None
+
+
+def batch_infrastructure(
+    aws_clients: AwsClients, aws_region: str, job_queue_name: str, job_definition_name: str
+) -> Infrastructure:
+    """
+    This function is not a fixture so that tests can pass the AWS clients to it and then
+    continue to use the infrastructure created by it while the client fixtures are in-tact
for
+    the duration of a test.
+
+    :param aws_clients:
+    :type aws_clients: AwsClients
+
+    :param aws_region:
+    :type aws_region: str
+
+    :param job_queue_name:
+    :type job_queue_name: str
+
+    :param job_definition_name:
+    :type job_definition_name: str
+
+    :return: Infrastructure for batch services
+    :rtype: Infrastructure
+    """
+
+    infrastructure = Infrastructure()
+    infrastructure.aws_region = aws_region
+    infrastructure.aws_clients = aws_clients
+
+    resp = aws_clients.ec2.create_vpc(CidrBlock="172.30.0.0/24")
+    vpc_id = resp["Vpc"]["VpcId"]
+
+    resp = aws_clients.ec2.create_subnet(
+        AvailabilityZone=f"{aws_region}a", CidrBlock="172.30.0.0/25", VpcId=vpc_id
+    )
+    subnet_id = resp["Subnet"]["SubnetId"]
+
+    resp = aws_clients.ec2.create_security_group(
+        Description="moto_test_sg_desc", GroupName="moto_test_sg", VpcId=vpc_id
+    )
+    sg_id = resp["GroupId"]
+
+    resp = aws_clients.iam.create_role(
+        RoleName="MotoTestRole", AssumeRolePolicyDocument="moto_test_policy"
+    )
+    iam_arn = resp["Role"]["Arn"]
+
+    compute_env_name = "moto_test_compute_env"
+    resp = aws_clients.batch.create_compute_environment(
+        computeEnvironmentName=compute_env_name,
+        type="UNMANAGED",
+        state="ENABLED",
+        serviceRole=iam_arn,
+    )
+    compute_env_arn = resp["computeEnvironmentArn"]
+
+    resp = aws_clients.batch.create_job_queue(
+        jobQueueName=job_queue_name,
+        state="ENABLED",
+        priority=123,
+        computeEnvironmentOrder=[{"order": 123, "computeEnvironment": compute_env_arn}],
+    )
+    assert resp["jobQueueName"] == job_queue_name
+    assert resp["jobQueueArn"]
+    job_queue_arn = resp["jobQueueArn"]
+
+    resp = aws_clients.batch.register_job_definition(
+        jobDefinitionName=job_definition_name,
+        type="container",
+        containerProperties={
+            "image": "busybox",
+            "vcpus": 1,
+            "memory": 64,
+            "command": ["sleep", "10"],
+        },
+    )
+    assert resp["jobDefinitionName"] == job_definition_name
+    assert resp["jobDefinitionArn"]
+    job_definition_arn = resp["jobDefinitionArn"]
+    assert resp["revision"]
+    assert resp["jobDefinitionArn"].endswith(
+        "{0}:{1}".format(resp["jobDefinitionName"], resp["revision"])
+    )
+
+    infrastructure.vpc_id = vpc_id
+    infrastructure.subnet_id = subnet_id
+    infrastructure.security_group_id = sg_id
+    infrastructure.iam_arn = iam_arn
+    infrastructure.compute_env_name = compute_env_name
+    infrastructure.compute_env_arn = compute_env_arn
+    infrastructure.job_queue_name = job_queue_name
+    infrastructure.job_queue_arn = job_queue_arn
+    infrastructure.job_definition_name = job_definition_name
+    infrastructure.job_definition_arn = job_definition_arn
+    return infrastructure
+
+
+#
+# pytest tests
+#
+
+
+def test_aws_batch_waiters(aws_region):
+    assert inspect.isclass(AwsBatchWaiters)
+    batch_waiters = AwsBatchWaiters(region_name=aws_region)
+    assert isinstance(batch_waiters, AwsBatchWaiters)
+
+
+@mock_batch
+@mock_ec2
+@mock_ecs
+@mock_iam
+@mock_logs
+def test_aws_batch_job_waiting(aws_clients, aws_region, job_queue_name, job_definition_name):
+    """
+    Submit batch jobs and wait for various job status indicators or errors.
+    These batch job waiter tests can be slow and might need to be marked
+    for conditional skips if they take too long, although it seems to
+    run in about 30 sec to a minute.
+
+    .. note::
+        These tests have no control over how moto transitions the batch job status.
+
+    .. seealso::
+        - https://github.com/boto/botocore/blob/develop/botocore/waiter.py
+        - https://github.com/spulec/moto/blob/master/moto/batch/models.py#L360
+        - https://github.com/spulec/moto/blob/master/tests/test_batch/test_batch.py
+    """
+
+    aws_resources = batch_infrastructure(
+        aws_clients, aws_region, job_queue_name, job_definition_name
+    )
+    batch_waiters = AwsBatchWaiters(region_name=aws_resources.aws_region)
+
+    job_exists_waiter = batch_waiters.get_waiter("JobExists")
+    assert job_exists_waiter
+    assert isinstance(job_exists_waiter, botocore.waiter.Waiter)
+    assert job_exists_waiter.__class__.__name__ == "Batch.Waiter.JobExists"
+
+    job_running_waiter = batch_waiters.get_waiter("JobRunning")
+    assert job_running_waiter
+    assert isinstance(job_running_waiter, botocore.waiter.Waiter)
+    assert job_running_waiter.__class__.__name__ == "Batch.Waiter.JobRunning"
+
+    job_complete_waiter = batch_waiters.get_waiter("JobComplete")
+    assert job_complete_waiter
+    assert isinstance(job_complete_waiter, botocore.waiter.Waiter)
+    assert job_complete_waiter.__class__.__name__ == "Batch.Waiter.JobComplete"
+
+    # test waiting on a jobId that does not exist (this throws immediately)
+    with pytest.raises(botocore.exceptions.WaiterError) as err:
+        job_exists_waiter.config.delay = 0.2
+        job_exists_waiter.config.max_attempts = 2
+        job_exists_waiter.wait(jobs=["missing-job"])
+    assert isinstance(err.value, botocore.exceptions.WaiterError)
+    assert "Waiter JobExists failed" in str(err.value)
+
+    # Submit a job and wait for various job status indicators;
+    # moto transitions the batch job status automatically.
+
+    job_name = "test-job"
+    job_cmd = ['/bin/sh -c "for a in `seq 1 2`; do echo Hello World; sleep 0.25; done"']
+
+    job_response = aws_clients.batch.submit_job(
+        jobName=job_name,
+        jobQueue=aws_resources.job_queue_arn,
+        jobDefinition=aws_resources.job_definition_arn,
+        containerOverrides={"command": job_cmd},
+    )
+    job_id = job_response["jobId"]
+
+    job_description = aws_clients.batch.describe_jobs(jobs=[job_id])
+    job_status = [job for job in job_description["jobs"] if job["jobId"] == job_id][0]["status"]
+    assert job_status == "PENDING"
+
+    # this should not raise a WaiterError and note there is no 'state' maintained in
+    # the waiter that can be checked after calling wait() and it has no return value;
+    # see https://github.com/boto/botocore/blob/develop/botocore/waiter.py#L287
+    job_exists_waiter.config.delay = 0.2
+    job_exists_waiter.config.max_attempts = 20
+    job_exists_waiter.wait(jobs=[job_id])
+
+    # test waiting for job completion with too few attempts (possibly before job is running)
+    job_complete_waiter.config.delay = 0.1
+    job_complete_waiter.config.max_attempts = 1
+    with pytest.raises(botocore.exceptions.WaiterError) as err:
 
 Review comment:
   Same here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message