From commits-return-12281-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Sun Jan 28 20:41:27 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 9DEC018064F for ; Sun, 28 Jan 2018 20:41:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8E4A0160C43; Sun, 28 Jan 2018 19:41:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0763160C20 for ; Sun, 28 Jan 2018 20:41:26 +0100 (CET) Received: (qmail 69435 invoked by uid 500); 28 Jan 2018 19:41:25 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 69426 invoked by uid 99); 28 Jan 2018 19:41:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 28 Jan 2018 19:41:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 71ACC180188 for ; Sun, 28 Jan 2018 19:41:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.731 X-Spam-Level: X-Spam-Status: No, score=-11.731 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id HRZQaSH7j98l for ; Sun, 28 Jan 2018 19:41:23 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 86E4F5FAD9 for ; Sun, 28 Jan 2018 19:41:23 +0000 (UTC) Received: (qmail 69423 invoked by uid 99); 28 Jan 2018 19:41:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 28 Jan 2018 19:41:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5C07E5CE1; Sun, 28 Jan 2018 19:41:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fokko@apache.org To: commits@airflow.incubator.apache.org Message-Id: <050bb544aef143a0994f9015a3b6bd0a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-2015] Add flag for interactive runs Date: Sun, 28 Jan 2018 19:41:21 +0000 (UTC) Repository: incubator-airflow Updated Branches: refs/heads/master a1d555177 -> efd8338dc [AIRFLOW-2015] Add flag for interactive runs We capture the standard output and error streams so that they're handled by the configured logger. However, sometimes, when developing dags or Airflow code itself, it is useful to put pdb breakpoints in code triggered using an `airflow run`. Such a flow would of course require not redirecting the output and error streams to the logger. This patch enables that by adding a flag to the `airflow run` subcommand. Note that this does not require `--local`. Closes #2957 from yati-sagade/ysagade/airflow-2015 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efd8338d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efd8338d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efd8338d Branch: refs/heads/master Commit: efd8338dc8bcf50a9d0ace7659650e85d6b49305 Parents: a1d5551 Author: Yati Sagade Authored: Sun Jan 28 20:41:14 2018 +0100 Committer: Fokko Driesprong Committed: Sun Jan 28 20:41:14 2018 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 120 ++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efd8338d/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index b032729..d0c11d3 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -324,6 +324,58 @@ def set_is_paused(is_paused, args, dag=None): print(msg) +def _run(args, dag, ti): + if args.local: + run_job = jobs.LocalTaskJob( + task_instance=ti, + mark_success=args.mark_success, + pickle_id=args.pickle, + ignore_all_deps=args.ignore_all_dependencies, + ignore_depends_on_past=args.ignore_depends_on_past, + ignore_task_deps=args.ignore_dependencies, + ignore_ti_state=args.force, + pool=args.pool) + run_job.run() + elif args.raw: + ti._run_raw_task( + mark_success=args.mark_success, + job_id=args.job_id, + pool=args.pool, + ) + else: + pickle_id = None + if args.ship_dag: + try: + # Running remotely, so pickling the DAG + session = settings.Session() + pickle = DagPickle(dag) + session.add(pickle) + session.commit() + pickle_id = pickle.id + # TODO: This should be written to a log + print('Pickled dag {dag} as pickle_id:{pickle_id}' + .format(**locals())) + except Exception as e: + print('Could not pickle the DAG') + print(e) + raise e + + executor = GetDefaultExecutor() + executor.start() + print("Sending to executor.") + executor.queue_task_instance( + ti, + mark_success=args.mark_success, + pickle_id=pickle_id, + ignore_all_deps=args.ignore_all_dependencies, + ignore_depends_on_past=args.ignore_depends_on_past, + ignore_task_deps=args.ignore_dependencies, + ignore_ti_state=args.force, + pool=args.pool) + executor.heartbeat() + executor.end() + + def run(args, dag=None): # Disable connection pooling to reduce the # of connections on the DB # while it's waiting for the task to finish. @@ -368,60 +420,13 @@ def run(args, dag=None): hostname = socket.getfqdn() log.info("Running %s on host %s", ti, hostname) - with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): - if args.local: - run_job = jobs.LocalTaskJob( - task_instance=ti, - mark_success=args.mark_success, - pickle_id=args.pickle, - ignore_all_deps=args.ignore_all_dependencies, - ignore_depends_on_past=args.ignore_depends_on_past, - ignore_task_deps=args.ignore_dependencies, - ignore_ti_state=args.force, - pool=args.pool) - run_job.run() - elif args.raw: - ti._run_raw_task( - mark_success=args.mark_success, - job_id=args.job_id, - pool=args.pool, - ) - else: - pickle_id = None - if args.ship_dag: - try: - # Running remotely, so pickling the DAG - session = settings.Session() - pickle = DagPickle(dag) - session.add(pickle) - session.commit() - pickle_id = pickle.id - # TODO: This should be written to a log - print(( - 'Pickled dag {dag} ' - 'as pickle_id:{pickle_id}').format(**locals())) - except Exception as e: - print('Could not pickle the DAG') - print(e) - raise e - - executor = GetDefaultExecutor() - executor.start() - print("Sending to executor.") - executor.queue_task_instance( - ti, - mark_success=args.mark_success, - pickle_id=pickle_id, - ignore_all_deps=args.ignore_all_dependencies, - ignore_depends_on_past=args.ignore_depends_on_past, - ignore_task_deps=args.ignore_dependencies, - ignore_ti_state=args.force, - pool=args.pool) - executor.heartbeat() - executor.end() - - logging.shutdown() - + if args.interactive: + _run(args, dag, ti) + else: + with redirect_stdout(ti.log, logging.INFO),\ + redirect_stderr(ti.log, logging.WARN): + _run(args, dag, ti) + logging.shutdown() def task_failed_deps(args): """ @@ -1281,6 +1286,11 @@ class CLIFactory(object): # dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and # the "ignore_all_dependencies" command should be called the"force" command # instead. + 'interactive': Arg( + ('-int', '--interactive'), + help='Do not capture standard output and error streams ' + '(useful for interactive debugging)', + action='store_true'), 'force': Arg( ("-f", "--force"), "Ignore previous task instance state, rerun regardless if task already " @@ -1529,7 +1539,7 @@ class CLIFactory(object): 'dag_id', 'task_id', 'execution_date', 'subdir', 'mark_success', 'force', 'pool', 'cfg_path', 'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies', - 'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id'), + 'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 'interactive',), }, { 'func': initdb, 'help': "Initialize the metadata database",