airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ashb closed pull request #4187: [AIRFLOW-3343] Update DockerOperator for Docker-py 3.0.0 API changes
Date Wed, 14 Nov 2018 22:06:05 GMT
ashb closed pull request #4187: [AIRFLOW-3343] Update DockerOperator for Docker-py 3.0.0 API
changes
URL: https://github.com/apache/incubator-airflow/pull/4187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index da4d3b5769..cf9838593f 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -108,6 +108,9 @@ class DockerOperator(BaseOperator):
     :type xcom_all: bool
     :param docker_conn_id: ID of the Airflow connection to use
     :type docker_conn_id: str
+    :param shm_size: Size of ``/dev/shm`` in bytes. The size must be
+        greater than 0. If omitted uses system default.
+    :type shm_size: int
     """
     template_fields = ('command', 'environment',)
     template_ext = ('.sh', '.bash',)
@@ -139,6 +142,7 @@ def __init__(
             dns=None,
             dns_search=None,
             auto_remove=False,
+            shm_size=None,
             *args,
             **kwargs):
 
@@ -167,7 +171,7 @@ def __init__(
         self.xcom_push_flag = xcom_push
         self.xcom_all = xcom_all
         self.docker_conn_id = docker_conn_id
-        self.shm_size = kwargs.get('shm_size')
+        self.shm_size = shm_size
 
         self.cli = None
         self.container = None
@@ -197,7 +201,7 @@ def execute(self, context):
         if self.force_pull or len(self.cli.images(name=self.image)) == 0:
             self.log.info('Pulling docker image %s', self.image)
             for l in self.cli.pull(self.image, stream=True):
-                output = json.loads(l.decode('utf-8'))
+                output = json.loads(l.decode('utf-8').strip())
                 if 'status' in output:
                     self.log.info("%s", output['status'])
 
@@ -230,9 +234,9 @@ def execute(self, context):
                     line = line.decode('utf-8')
                 self.log.info(line)
 
-            exit_code = self.cli.wait(self.container['Id'])
-            if exit_code != 0:
-                raise AirflowException('docker container failed')
+            result = self.cli.wait(self.container['Id'])
+            if result['StatusCode'] != 0:
+                raise AirflowException('docker container failed: ' + repr(result))
 
             if self.xcom_push_flag:
                 return self.cli.logs(container=self.container['Id']) \
diff --git a/setup.py b/setup.py
index 7aeb5b59c9..1409959a50 100644
--- a/setup.py
+++ b/setup.py
@@ -174,7 +174,7 @@ def write_version(filename=os.path.join(*['airflow',
     'sphinx-rtd-theme>=0.1.6',
     'Sphinx-PyPI-upload>=0.2.1'
 ]
-docker = ['docker>=3.0.0']
+docker = ['docker~=3.0']
 druid = ['pydruid>=0.4.1']
 elasticsearch = [
     'elasticsearch>=5.0.0,<6.0.0',
diff --git a/tests/operators/test_docker_operator.py b/tests/operators/test_docker_operator.py
index 7ab27c1aeb..23577d7a28 100644
--- a/tests/operators/test_docker_operator.py
+++ b/tests/operators/test_docker_operator.py
@@ -51,7 +51,7 @@ def test_execute(self, client_class_mock, mkdtemp_mock):
         client_mock.images.return_value = []
         client_mock.logs.return_value = ['container log']
         client_mock.pull.return_value = [b'{"status":"pull log"}']
-        client_mock.wait.return_value = 0
+        client_mock.wait.return_value = {"StatusCode": 0}
 
         client_class_mock.return_value = client_mock
 
@@ -97,7 +97,7 @@ def test_execute_tls(self, client_class_mock, tls_class_mock):
         client_mock.images.return_value = []
         client_mock.logs.return_value = []
         client_mock.pull.return_value = []
-        client_mock.wait.return_value = 0
+        client_mock.wait.return_value = {"StatusCode": 0}
 
         client_class_mock.return_value = client_mock
         tls_mock = mock.Mock()
@@ -123,7 +123,7 @@ def test_execute_unicode_logs(self, client_class_mock):
         client_mock.images.return_value = []
         client_mock.logs.return_value = ['unicode container log 😁']
         client_mock.pull.return_value = []
-        client_mock.wait.return_value = 0
+        client_mock.wait.return_value = {"StatusCode": 0}
 
         client_class_mock.return_value = client_mock
 
@@ -145,7 +145,7 @@ def test_execute_container_fails(self, client_class_mock):
         client_mock.images.return_value = []
         client_mock.logs.return_value = []
         client_mock.pull.return_value = []
-        client_mock.wait.return_value = 1
+        client_mock.wait.return_value = {"StatusCode": 1}
 
         client_class_mock.return_value = client_mock
 
@@ -174,7 +174,7 @@ def test_execute_no_docker_conn_id_no_hook(self, operator_client_mock):
         client_mock.create_container.return_value = {'Id': 'some_id'}
         client_mock.logs.return_value = []
         client_mock.pull.return_value = []
-        client_mock.wait.return_value = 0
+        client_mock.wait.return_value = {"StatusCode": 0}
         operator_client_mock.return_value = client_mock
 
         # Create the DockerOperator
@@ -209,7 +209,7 @@ def test_execute_with_docker_conn_id_use_hook(self, operator_client_mock,
         client_mock.create_container.return_value = {'Id': 'some_id'}
         client_mock.logs.return_value = []
         client_mock.pull.return_value = []
-        client_mock.wait.return_value = 0
+        client_mock.wait.return_value = {"StatusCode": 0}
         operator_client_mock.return_value = client_mock
 
         # Create the DockerOperator


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message