airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1813] Bug SSH Operator empty buffer
Date Mon, 13 Nov 2017 15:55:37 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test 3bc0b7d33 -> fab727d34


[AIRFLOW-1813] Bug SSH Operator empty buffer

The SSH Operator will throw an empty "SSH operator
error" when running
commands that do not immediately log something to
the terminal. This is
due to a call to stdout.channel.recv when the
channel currently has a
0-size buffer, either because the command has not
yet logged anything,
or never will (e.g. sleep 5)

Make code PEP8 compliant

Closes #2785 from RJKeevil/fix-ssh-operator-no-
terminal-output

(cherry picked from commit d4d8eb932657f4d1eccfaa8bb1d12933535fae94)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fab727d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fab727d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fab727d3

Branch: refs/heads/v1-9-test
Commit: fab727d34edfbd9b96c088ae5f8f538f4f74a114
Parents: 3bc0b7d
Author: Rob Keevil <robkeevil@gmail.com>
Authored: Mon Nov 13 16:01:15 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Mon Nov 13 16:55:28 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/ssh_operator.py    | 25 +++++++++++++----------
 tests/contrib/operators/test_ssh_operator.py | 18 ++++++++++++++++
 2 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fab727d3/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index fbbf86c..9f2ca81 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -23,7 +23,6 @@ from airflow.utils.decorators import apply_defaults
 
 
 class SSHOperator(BaseOperator):
-
     """
     SSHOperator to execute commands on given remote host using the ssh_hook.
 
@@ -94,10 +93,15 @@ class SSHOperator(BaseOperator):
             stdin.close()
             channel.shutdown_write()
 
-            agg_stdout=b''
-            agg_stderr=b''
+            agg_stdout = b''
+            agg_stderr = b''
+
+            # capture any initial output in case channel is closed already
+            stdout_buffer_length = len(stdout.channel.in_buffer)
+
+            if stdout_buffer_length > 0:
+                agg_stdout += stdout.channel.recv(stdout_buffer_length)
 
-            agg_stdout+=stdout.channel.recv(len(stdout.channel.in_buffer))
             # read from both stdout and stderr
             while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
                 readq, _, _ = select([channel], [], [], self.timeout)
@@ -105,17 +109,16 @@ class SSHOperator(BaseOperator):
                     if c.recv_ready():
                         line = stdout.channel.recv(len(c.in_buffer))
                         line = line
-                        agg_stdout+=line
+                        agg_stdout += line
                         self.log.info(line.decode('utf-8').strip('\n'))
                     if c.recv_stderr_ready():
                         line = stderr.channel.recv_stderr(len(c.in_stderr_buffer))
                         line = line
-                        agg_stderr+=line
+                        agg_stderr += line
                         self.log.warning(line.decode('utf-8').strip('\n'))
-                if stdout.channel.exit_status_ready() \
-                    and not stderr.channel.recv_stderr_ready() \
-                    and not stdout.channel.recv_ready():
-
+                if stdout.channel.exit_status_ready()\
+                        and not stderr.channel.recv_stderr_ready()\
+                        and not stdout.channel.recv_ready():
                     stdout.channel.shutdown_read()
                     stdout.channel.close()
                     break
@@ -137,7 +140,7 @@ class SSHOperator(BaseOperator):
             else:
                 error_msg = agg_stderr.decode('utf-8')
                 raise AirflowException("error running cmd: {0}, error: {1}"
-                                        .format(self.command, error_msg))
+                                       .format(self.command, error_msg))
 
         except Exception as e:
             raise AirflowException("SSH operator error: {0}".format(str(e)))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fab727d3/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py
index f205b97..019dfe4 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -107,5 +107,23 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow')
 
+    def test_no_output_command(self):
+        configuration.set("core", "enable_xcom_pickling", "True")
+        task = SSHOperator(
+            task_id="test",
+            ssh_hook=self.hook,
+            command="sleep 1",
+            do_xcom_push=True,
+            dag=self.dag,
+        )
+
+        self.assertIsNotNone(task)
+
+        ti = TaskInstance(
+            task=task, execution_date=datetime.now())
+        ti.run()
+        self.assertIsNotNone(ti.duration)
+        self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'')
+
 if __name__ == '__main__':
     unittest.main()


Mime
View raw message