airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] fenglu-g commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
Date Tue, 14 Aug 2018 22:45:00 GMT
fenglu-g commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job
due to name mismatch
URL: https://github.com/apache/incubator-airflow/pull/3744#discussion_r210127353
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_dataflow_hook.py
 ##########
 @@ -124,36 +127,38 @@ def __init__(self, cmd):
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            lines = self._proc.stderr.readlines()
-            for line in lines:
-                self.log.warning(line[:-1])
-            if lines:
-                return lines[-1]
+            return self._proc.stderr.readline()
         if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline()
-            return line
+            return self._proc.stdout.readline()
 
     @staticmethod
     def _extract_job(line):
-        if line is not None:
-            if line.startswith("Submitted job: "):
-                return line[15:-1]
+        job_id_pattern = re.compile(
+            '.*https://console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
+        matched_job = job_id_pattern.match(line or '')
+        if matched_job:
+            return matched_job.group(1)
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
         self.log.info("Start waiting for DataFlow process to complete.")
-        while self._proc.poll() is None:
+        job_id = None
+        while True:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
                     if line:
-                        self.log.debug(line[:-1])
+                        self.log.info(line[:-1])
 
 Review comment:
   Good point, done. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message