Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4045F200D18 for ; Wed, 11 Oct 2017 23:06:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3D5431609E5; Wed, 11 Oct 2017 21:06:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 590D21609BB for ; Wed, 11 Oct 2017 23:06:07 +0200 (CEST) Received: (qmail 26163 invoked by uid 500); 11 Oct 2017 21:06:06 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 26153 invoked by uid 99); 11 Oct 2017 21:06:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 21:06:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B37DD1A1184 for ; Wed, 11 Oct 2017 21:06:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id zqrB2cLIB6Ir for ; Wed, 11 Oct 2017 21:06:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id BAA665F5A5 for ; Wed, 11 Oct 2017 21:06:02 +0000 (UTC) Received: (qmail 26146 invoked by uid 99); 11 Oct 2017 21:06:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 21:06:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DC2A6DF9AF; Wed, 11 Oct 2017 21:06:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davydov@apache.org To: commits@airflow.incubator.apache.org Message-Id: <7c077f54c08d4c7abc5583a5dfce9436@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-1681] Add batch clear in task instance view Date: Wed, 11 Oct 2017 21:06:01 +0000 (UTC) archived-at: Wed, 11 Oct 2017 21:06:08 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master 98b4df945 -> 9f2c16a0a [AIRFLOW-1681] Add batch clear in task instance view Allow users to batch clear selected task instance(s) in task instance view. Only state(s) of selected task instance(s) will be cleared--no upstream nor downstream task instance will be affected. DAG(s) involved will be set to "RUNNING" state, same as existing "clear" operation. Keeping both "Delete" and "Clear" operations for more smooth user habit transition--informing DAG state change in pop-up (check screenshots). Closes #2681 from yrqls21/add-batch-clear-in-task- instance-view Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0 Branch: refs/heads/master Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5 Parents: 98b4df9 Author: Kevin Yang Authored: Wed Oct 11 14:05:33 2017 -0700 Committer: Dan Davydov Committed: Wed Oct 11 14:05:35 2017 -0700 ---------------------------------------------------------------------- airflow/www/views.py | 62 +++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index bc63b5b..81ee61f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -37,7 +37,7 @@ import sqlalchemy as sqla from sqlalchemy import or_, desc, and_, union_all from flask import ( - abort, redirect, url_for, request, Markup, Response, current_app, render_template, + abort, redirect, url_for, request, Markup, Response, current_app, render_template, make_response) from flask_admin import BaseView, expose, AdminIndexView from flask_admin.contrib.sqla import ModelView @@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly): 'start_date', 'end_date', 'duration', 'job_id', 'hostname', 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log_url') - can_delete = True page_size = PAGE_SIZE @action('set_running', "Set state to 'running'", None) @@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly): def action_set_retry(self, ids): self.set_task_instance_state(ids, State.UP_FOR_RETRY) - @action('delete', - lazy_gettext('Delete'), - lazy_gettext('Are you sure you want to delete selected records?')) - def action_delete(self, ids): - """ - As a workaround for AIRFLOW-277, this method overrides Flask-Admin's ModelView.action_delete(). - - TODO: this method should be removed once the below bug is fixed on Flask-Admin side. - https://github.com/flask-admin/flask-admin/issues/1226 - """ - if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): - self.delete_task_instances(ids) - else: - super(TaskInstanceModelView, self).action_delete(ids) - @provide_session - def set_task_instance_state(self, ids, target_state, session=None): + @action('clear', + lazy_gettext('Clear'), + lazy_gettext( + 'Are you sure you want to clear the state of the selected task instance(s)' + ' and set their dagruns to the running state?')) + def action_clear(self, ids, session=None): try: TI = models.TaskInstance - count = len(ids) + + dag_to_tis = {} + for id in ids: task_id, dag_id, execution_date = id.split(',') - execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') + ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date).one() - ti.state = target_state + + dag = dagbag.get_dag(dag_id) + tis = dag_to_tis.setdefault(dag, []) + tis.append(ti) + + for dag, tis in dag_to_tis.items(): + models.clear_task_instances(tis, session, dag=dag) + session.commit() - flash( - "{count} task instances were set to '{target_state}'".format(**locals())) + flash("{0} task instances have been cleared".format(len(ids))) + except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") - flash('Failed to set state', 'error') + flash('Failed to clear task instances', 'error') @provide_session - def delete_task_instances(self, ids, session=None): + def set_task_instance_state(self, ids, target_state, session=None): try: TI = models.TaskInstance - count = 0 + count = len(ids) for id in ids: task_id, dag_id, execution_date = id.split(',') execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') - count += session.query(TI).filter(TI.task_id == task_id, - TI.dag_id == dag_id, - TI.execution_date == execution_date).delete() + ti = session.query(TI).filter(TI.task_id == task_id, + TI.dag_id == dag_id, + TI.execution_date == execution_date).one() + ti.state = target_state session.commit() - flash("{count} task instances were deleted".format(**locals())) + flash( + "{count} task instances were set to '{target_state}'".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") - flash('Failed to delete', 'error') + flash('Failed to set state', 'error') def get_one(self, id): """