airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1789][AIRFLOW-1712] Log SSHOperator stderr to log.warning
Date Wed, 08 Nov 2017 18:38:54 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 313f5bac4 -> 1943a96e7


[AIRFLOW-1789][AIRFLOW-1712] Log SSHOperator stderr to log.warning

Logging functionality for SSHOperator was added in
[AIRFLOW-1712] but it
only logged stdout.
This commit also logs stderr to log.warning

Closes #2761 from OpringaoDoTurno/stderr_in_ssh


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1943a96e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1943a96e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1943a96e

Branch: refs/heads/master
Commit: 1943a96e708dd68a6990b022ffbbe3729a8c27b8
Parents: 313f5ba
Author: Ignasi Peiró <ignasi.peiro@gmail.com>
Authored: Wed Nov 8 19:38:39 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Wed Nov 8 19:38:39 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/ssh_operator.py | 48 +++++++++++++++++++++-----
 1 file changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1943a96e/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index bb72330..fbbf86c 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 from base64 import b64encode
+from select import select
 
 from airflow import configuration
 from airflow.contrib.hooks.ssh_hook import SSHHook
@@ -86,26 +87,55 @@ class SSHOperator(BaseOperator):
                                                             get_pty=get_pty,
                                                             timeout=self.timeout
                                                             )
+            # get channels
+            channel = stdout.channel
+
+            # closing stdin
             stdin.close()
-            output=b''
-            for line in stdout:
-                output+=line.encode('utf-8')
-                self.log.info(line.strip('\n'))
+            channel.shutdown_write()
+
+            agg_stdout=b''
+            agg_stderr=b''
+
+            agg_stdout+=stdout.channel.recv(len(stdout.channel.in_buffer))
+            # read from both stdout and stderr
+            while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
+                readq, _, _ = select([channel], [], [], self.timeout)
+                for c in readq:
+                    if c.recv_ready():
+                        line = stdout.channel.recv(len(c.in_buffer))
+                        line = line
+                        agg_stdout+=line
+                        self.log.info(line.decode('utf-8').strip('\n'))
+                    if c.recv_stderr_ready():
+                        line = stderr.channel.recv_stderr(len(c.in_stderr_buffer))
+                        line = line
+                        agg_stderr+=line
+                        self.log.warning(line.decode('utf-8').strip('\n'))
+                if stdout.channel.exit_status_ready() \
+                    and not stderr.channel.recv_stderr_ready() \
+                    and not stdout.channel.recv_ready():
+
+                    stdout.channel.shutdown_read()
+                    stdout.channel.close()
+                    break
+
+            stdout.close()
+            stderr.close()
 
             exit_status = stdout.channel.recv_exit_status()
             if exit_status is 0:
-                # only returning on output if do_xcom_push is set
-                # otherwise its not suppose to be disclosed
+                # returning output if do_xcom_push is set
                 if self.do_xcom_push:
                     enable_pickling = configuration.getboolean('core',
                                                                'enable_xcom_pickling')
                     if enable_pickling:
-                        return output
+                        return agg_stdout
                     else:
-                        return b64encode(output).decode('utf-8')
+                        return b64encode(agg_stdout).decode('utf-8')
 
             else:
-                error_msg = stderr.read()
+                error_msg = agg_stderr.decode('utf-8')
                 raise AirflowException("error running cmd: {0}, error: {1}"
                                         .format(self.command, error_msg))
 


Mime
View raw message