Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DCA8F200D3D for ; Mon, 13 Nov 2017 16:55:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DAF0C160BF3; Mon, 13 Nov 2017 15:55:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 06611160BE4 for ; Mon, 13 Nov 2017 16:55:42 +0100 (CET) Received: (qmail 53902 invoked by uid 500); 13 Nov 2017 15:55:42 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 53893 invoked by uid 99); 13 Nov 2017 15:55:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Nov 2017 15:55:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 626621807BB for ; Mon, 13 Nov 2017 15:55:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id UwuWxP1L2F42 for ; Mon, 13 Nov 2017 15:55:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 648D660F64 for ; Mon, 13 Nov 2017 15:55:38 +0000 (UTC) Received: (qmail 53877 invoked by uid 99); 13 Nov 2017 15:55:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Nov 2017 15:55:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9631FDFC2E; Mon, 13 Nov 2017 15:55:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bolke@apache.org To: commits@airflow.incubator.apache.org Message-Id: <0828b5e1a9cf44a18888d70c76bb703d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-1813] Bug SSH Operator empty buffer Date: Mon, 13 Nov 2017 15:55:37 +0000 (UTC) archived-at: Mon, 13 Nov 2017 15:55:44 -0000 Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test 3bc0b7d33 -> fab727d34 [AIRFLOW-1813] Bug SSH Operator empty buffer The SSH Operator will throw an empty "SSH operator error" when running commands that do not immediately log something to the terminal. This is due to a call to stdout.channel.recv when the channel currently has a 0-size buffer, either because the command has not yet logged anything, or never will (e.g. sleep 5) Make code PEP8 compliant Closes #2785 from RJKeevil/fix-ssh-operator-no- terminal-output (cherry picked from commit d4d8eb932657f4d1eccfaa8bb1d12933535fae94) Signed-off-by: Bolke de Bruin Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fab727d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fab727d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fab727d3 Branch: refs/heads/v1-9-test Commit: fab727d34edfbd9b96c088ae5f8f538f4f74a114 Parents: 3bc0b7d Author: Rob Keevil Authored: Mon Nov 13 16:01:15 2017 +0100 Committer: Bolke de Bruin Committed: Mon Nov 13 16:55:28 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/ssh_operator.py | 25 +++++++++++++---------- tests/contrib/operators/test_ssh_operator.py | 18 ++++++++++++++++ 2 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fab727d3/airflow/contrib/operators/ssh_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py index fbbf86c..9f2ca81 100644 --- a/airflow/contrib/operators/ssh_operator.py +++ b/airflow/contrib/operators/ssh_operator.py @@ -23,7 +23,6 @@ from airflow.utils.decorators import apply_defaults class SSHOperator(BaseOperator): - """ SSHOperator to execute commands on given remote host using the ssh_hook. @@ -94,10 +93,15 @@ class SSHOperator(BaseOperator): stdin.close() channel.shutdown_write() - agg_stdout=b'' - agg_stderr=b'' + agg_stdout = b'' + agg_stderr = b'' + + # capture any initial output in case channel is closed already + stdout_buffer_length = len(stdout.channel.in_buffer) + + if stdout_buffer_length > 0: + agg_stdout += stdout.channel.recv(stdout_buffer_length) - agg_stdout+=stdout.channel.recv(len(stdout.channel.in_buffer)) # read from both stdout and stderr while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): readq, _, _ = select([channel], [], [], self.timeout) @@ -105,17 +109,16 @@ class SSHOperator(BaseOperator): if c.recv_ready(): line = stdout.channel.recv(len(c.in_buffer)) line = line - agg_stdout+=line + agg_stdout += line self.log.info(line.decode('utf-8').strip('\n')) if c.recv_stderr_ready(): line = stderr.channel.recv_stderr(len(c.in_stderr_buffer)) line = line - agg_stderr+=line + agg_stderr += line self.log.warning(line.decode('utf-8').strip('\n')) - if stdout.channel.exit_status_ready() \ - and not stderr.channel.recv_stderr_ready() \ - and not stdout.channel.recv_ready(): - + if stdout.channel.exit_status_ready()\ + and not stderr.channel.recv_stderr_ready()\ + and not stdout.channel.recv_ready(): stdout.channel.shutdown_read() stdout.channel.close() break @@ -137,7 +140,7 @@ class SSHOperator(BaseOperator): else: error_msg = agg_stderr.decode('utf-8') raise AirflowException("error running cmd: {0}, error: {1}" - .format(self.command, error_msg)) + .format(self.command, error_msg)) except Exception as e: raise AirflowException("SSH operator error: {0}".format(str(e))) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fab727d3/tests/contrib/operators/test_ssh_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py index f205b97..019dfe4 100644 --- a/tests/contrib/operators/test_ssh_operator.py +++ b/tests/contrib/operators/test_ssh_operator.py @@ -107,5 +107,23 @@ class SSHOperatorTest(unittest.TestCase): self.assertIsNotNone(ti.duration) self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow') + def test_no_output_command(self): + configuration.set("core", "enable_xcom_pickling", "True") + task = SSHOperator( + task_id="test", + ssh_hook=self.hook, + command="sleep 1", + do_xcom_push=True, + dag=self.dag, + ) + + self.assertIsNotNone(task) + + ti = TaskInstance( + task=task, execution_date=datetime.now()) + ti.run() + self.assertIsNotNone(ti.duration) + self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'') + if __name__ == '__main__': unittest.main()