airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2259] Dataflow Hook Index out of range
Date Sat, 31 Mar 2018 09:19:40 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master bf86b8943 -> 98b5c169c


[AIRFLOW-2259] Dataflow Hook Index out of range

Because the api doesn't return the job, because it
isn't known yet
or because it cannot be found, it could be that
the log lines arent
available yet. Also use the locations based api
call, which returns
the jobs immediately. This call seems to work for
us-central, but
doesn't return any jobs for other regions.

Closes #3165 from Fokko/AIRFLOW-2259


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/98b5c169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/98b5c169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/98b5c169

Branch: refs/heads/master
Commit: 98b5c169c678fccce2a5a68e2c5eee42cdb8b393
Parents: bf86b89
Author: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Authored: Sat Mar 31 11:19:31 2018 +0200
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sat Mar 31 11:19:31 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py | 66 ++++++++++++++++---------
 1 file changed, 44 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/98b5c169/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index dc03f05..ab9fa3a 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -22,19 +22,25 @@ from apiclient.discovery import build
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+# This is the default location
+# https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+DEFAULT_DATAFLOW_LOCATION = 'us-central1'
+
 
 class _DataflowJob(LoggingMixin):
-    def __init__(self, dataflow, project_number, name, poll_sleep=10):
+    def __init__(self, dataflow, project_number, name, location, poll_sleep=10):
         self._dataflow = dataflow
         self._project_number = project_number
         self._job_name = name
+        self._job_location = location
         self._job_id = None
         self._job = self._get_job()
         self._poll_sleep = poll_sleep
 
     def _get_job_id_from_name(self):
-        jobs = self._dataflow.projects().jobs().list(
-            projectId=self._project_number
+        jobs = self._dataflow.projects().locations().jobs().list(
+            projectId=self._project_number,
+            location=self._job_location
         ).execute()
         for job in jobs['jobs']:
             if job['name'] == self._job_name:
@@ -43,26 +49,34 @@ class _DataflowJob(LoggingMixin):
         return None
 
     def _get_job(self):
-        if self._job_id is None:
+        if self._job_name:
             job = self._get_job_id_from_name()
         else:
-            job = self._dataflow.projects().jobs().get(projectId=self._project_number,
-                                                       jobId=self._job_id).execute()
-        if 'currentState' in job:
+            job = self._dataflow.projects().jobs().get(
+                projectId=self._project_number,
+                jobId=self._job_id
+            ).execute()
+
+        if job and 'currentState' in job:
             self.log.info(
                 'Google Cloud DataFlow job %s is %s',
                 job['name'], job['currentState']
             )
-        else:
+        elif job:
             self.log.info(
                 'Google Cloud DataFlow with job_id %s has name %s',
                 self._job_id, job['name']
             )
+        else:
+            self.log.info(
+                'Google Cloud DataFlow job not available yet..'
+            )
+
         return job
 
     def wait_for_done(self):
         while True:
-            if 'currentState' in self._job:
+            if self._job and 'currentState' in self._job:
                 if 'JOB_STATE_DONE' == self._job['currentState']:
                     return True
                 elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
@@ -106,9 +120,9 @@ class _Dataflow(LoggingMixin):
         if fd == self._proc.stderr.fileno():
             lines = self._proc.stderr.readlines()
             for line in lines:
-              self.log.warning(line[:-1])
-            line = lines[-1][:-1]
-            return line
+                self.log.warning(line[:-1])
+            if lines:
+                return lines[-1]
         if fd == self._proc.stdout.fileno():
             line = self._proc.stdout.readline()
             return line
@@ -127,7 +141,8 @@ class _Dataflow(LoggingMixin):
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
-                    self.log.debug(line[:-1])
+                    if line:
+                        self.log.debug(line[:-1])
             else:
                 self.log.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
@@ -153,11 +168,20 @@ class DataFlowHook(GoogleCloudBaseHook):
 
     def _start_dataflow(self, task_id, variables, name,
                         command_prefix, label_formatter):
+        variables = self._set_variables(variables)
         cmd = command_prefix + self._build_cmd(task_id, variables,
                                                label_formatter)
         _Dataflow(cmd).wait_for_done()
-        _DataflowJob(self.get_conn(), variables['project'],
-                     name, self.poll_sleep).wait_for_done()
+        _DataflowJob(self.get_conn(), variables['project'], name,
+                     variables['region'], self.poll_sleep).wait_for_done()
+
+    @staticmethod
+    def _set_variables(variables):
+        if variables['project'] is None:
+            raise Exception('Project not specified')
+        if 'region' not in variables.keys:
+            variables['region'] = DEFAULT_DATAFLOW_LOCATION
+        return variables
 
     def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
                             append_job_name=True):
@@ -169,7 +193,7 @@ class DataFlowHook(GoogleCloudBaseHook):
 
         def label_formatter(labels_dict):
             return ['--labels={}'.format(
-                    json.dumps(labels_dict).replace(' ', ''))]
+                json.dumps(labels_dict).replace(' ', ''))]
         command_prefix = (["java", "-cp", dataflow, job_class] if job_class
                           else ["java", "-jar", dataflow])
         self._start_dataflow(task_id, variables, name,
@@ -190,7 +214,7 @@ class DataFlowHook(GoogleCloudBaseHook):
             name = task_id + "-" + str(uuid.uuid1())[:8]
         else:
             name = task_id
-        variables["job_name"] = name
+        variables['job_name'] = name
 
         def label_formatter(labels_dict):
             return ['--labels={}={}'.format(key, value)
@@ -223,13 +247,11 @@ class DataFlowHook(GoogleCloudBaseHook):
                 "parameters": parameters,
                 "environment": environment}
         service = self.get_conn()
-        if variables['project'] is None:
-            raise Exception(
-                'Project not specified')
         request = service.projects().templates().launch(projectId=variables['project'],
                                                         gcsPath=dataflow_template,
                                                         body=body)
         response = request.execute()
-        _DataflowJob(
-            self.get_conn(), variables['project'], name, self.poll_sleep).wait_for_done()
+        variables = self._set_variables(variables)
+        _DataflowJob(self.get_conn(), variables['project'], name, variables['region'],
+                     self.poll_sleep).wait_for_done()
         return response


Mime
View raw message