airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ashb commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
Date Tue, 14 Aug 2018 15:54:26 GMT
ashb commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due
to name mismatch
URL: https://github.com/apache/incubator-airflow/pull/3744#discussion_r210006800
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_dataflow_hook.py
 ##########
 @@ -124,36 +127,38 @@ def __init__(self, cmd):
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            lines = self._proc.stderr.readlines()
-            for line in lines:
-                self.log.warning(line[:-1])
-            if lines:
-                return lines[-1]
+            return self._proc.stderr.readline()
         if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline()
-            return line
+            return self._proc.stdout.readline()
 
     @staticmethod
     def _extract_job(line):
-        if line is not None:
-            if line.startswith("Submitted job: "):
-                return line[15:-1]
+        job_id_pattern = re.compile(
+            '.*https://console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
+        matched_job = job_id_pattern.match(line or '')
+        if matched_job:
+            return matched_job.group(1)
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
         self.log.info("Start waiting for DataFlow process to complete.")
-        while self._proc.poll() is None:
+        job_id = None
+        while True:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
                     if line:
-                        self.log.debug(line[:-1])
+                        self.log.info(line[:-1])
+                        job_id = job_id or self._extract_job(line)
             else:
                 self.log.info("Waiting for DataFlow process to complete.")
+            if self._proc.poll() is not None:
 
 Review comment:
   Reading each of stdout and stderr independently is hard to do without deadlocking one or
the other too -you can't just read one to the end then read the other. https://stackoverflow.com/questions/33886406/how-to-avoid-the-deadlock-in-a-subprocess-without-using-communicate
   
   (Sorry if this is out of context - I haven't looked at the PR, just seen the comments)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message