airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-329] Update Dag Overview Page with Better Status Columns
Date Thu, 18 Aug 2016 00:54:15 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master fcf645b2f -> 38972162c


[AIRFLOW-329] Update Dag Overview Page with Better Status Columns

Renamed 'Recent Statuses' to 'Recent Tasks'.
Created 'DAG Stats' column. Created a cache table
dag_stats to hold data for 'DAG Stats' column with
dirty bit for each row. Upon dagrun creation,
state change, or deletion via web UI, appropriate
rows will have dirty set to true and then dirty
rows will be refreshed with up to date data. Upon
execution of `airflow upgradedb` command, if
dag_stat is empty then it will be populated from
dag_run data.

API endpoints in views.py have also been changed.
'/dag_stats' has been renamed to '/task_stats',
and endpoint reused for the 'DAG Stats' column.

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-329

Testing Done:
- Added a unit test to core tests, tests that
dag_stats table is appropriately updated after
dagrun creation and state change
-Added sanity check for '/dag_stats' and
'/task_stats' API endpoints

Renamed 'Recent Statuses' to 'Recent Tasks'.
Created 'DAG Stats' column. Created a cache table
dag_stats to hold data for 'DAG Stats' column with
dirty bit for each row. Upon dagrun creation,
state change, or deletion via web UI, appropriate
rows will have dirty set to true and then dirty
rows will be refreshed with up to date data. Upon
execution of `airflow webserver` command, all rows
in dag_stat will be dirtied and cleaned.

API endpoints in views.py have also been changed.
'/dag_stats' has been renamed to '/task_stats',
and endpoint reused for the 'DAG Stats' column.

Closes #1730 from normster/dagstat


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/38972162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/38972162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/38972162

Branch: refs/heads/master
Commit: 38972162c7663c98c8871899959fd4f55d79cef1
Parents: fcf645b
Author: Norman Mu <norman@agari.com>
Authored: Wed Aug 17 17:54:00 2016 -0700
Committer: Siddharth Anand <siddharthanand@yahoo.com>
Committed: Wed Aug 17 17:54:00 2016 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                              |  17 +++-
 airflow/jobs.py                                 |   4 +
 .../f2ca10b85618_add_dag_stats_table.py         |  42 ++++++++
 airflow/models.py                               |  92 +++++++++++++++--
 airflow/operators/dagrun_operator.py            |  11 +-
 airflow/utils/state.py                          |  15 +++
 airflow/www/templates/airflow/dags.html         | 100 +++++++++++++++++--
 airflow/www/views.py                            |  64 ++++++++++--
 tests/core.py                                   |  66 ++++++++++++
 9 files changed, 382 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 6750e10..3ca608f 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -41,12 +41,15 @@ from airflow import jobs, settings
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException
 from airflow.executors import DEFAULT_EXECUTOR
-from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun, Variable
+from airflow.models import (DagModel, DagBag, TaskInstance,
+                            DagPickle, DagRun, Variable, DagStat)
 from airflow.utils import db as db_utils
 from airflow.utils import logging as logging_utils
 from airflow.utils.state import State
 from airflow.www.app import cached_app
 
+from sqlalchemy import func
+
 DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
 
@@ -615,7 +618,6 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
 
 
 def webserver(args):
-
     print(settings.HEADER)
 
     app = cached_app(conf)
@@ -800,6 +802,17 @@ def upgradedb(args):  # noqa
     print("DB: " + repr(settings.engine.url))
     db_utils.upgradedb()
 
+    # Populate DagStats table
+    session = settings.Session()
+    ds_rows = session.query(DagStat).count()
+    if not ds_rows:
+        qry = (
+            session.query(DagRun.dag_id, DagRun.state, func.count('*'))
+            .group_by(DagRun.dag_id, DagRun.state)
+        )
+        for dag_id, state, count in qry:
+            session.add(DagStat(dag_id=dag_id, state=state, count=count))
+        session.commit()
 
 def version(args):  # noqa
     print(settings.HEADER + "  v" + airflow.__version__)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 944d847..0791ff5 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1094,6 +1094,8 @@ class SchedulerJob(BaseJob):
             self._process_task_instances(dag, tis_out)
             self.manage_slas(dag)
 
+        models.DagStat.clean_dirty([d.dag_id for d in dags])
+
     def _process_executor_events(self):
         """
         Respond to executor events.
@@ -1844,6 +1846,8 @@ class BackfillJob(BaseJob):
 
             # update dag run state
             run.update_state(session=session)
+            if run.dag.is_paused:
+                models.DagStat.clean_dirty([run.dag_id], session=session)
 
         executor.end()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
new file mode 100644
index 0000000..dabe812
--- /dev/null
+++ b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py
@@ -0,0 +1,42 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""add dag_stats table
+
+Revision ID: f2ca10b85618
+Revises: 64de9cddf6c9
+Create Date: 2016-07-20 15:08:28.247537
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'f2ca10b85618'
+down_revision = '64de9cddf6c9'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.create_table('dag_stats',
+                    sa.Column('dag_id', sa.String(length=250), nullable=False),
+                    sa.Column('state', sa.String(length=50), nullable=False),
+                    sa.Column('count', sa.Integer(), nullable=False, default=0),
+                    sa.Column('dirty', sa.Boolean(), nullable=False, default=False),
+                    sa.PrimaryKeyConstraint('dag_id', 'state'))
+
+
+def downgrade():
+    op.drop_table('dag_stats')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 3d0f4ea..9eefaf3 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1250,9 +1250,9 @@ class TaskInstance(Base):
         :return: DagRun
         """
         dr = session.query(DagRun).filter(
-            DagRun.dag_id==self.dag_id,
-            DagRun.execution_date==self.execution_date,
-            DagRun.start_date==self.start_date
+            DagRun.dag_id == self.dag_id,
+            DagRun.execution_date == self.execution_date,
+            DagRun.start_date == self.start_date
         ).first()
 
         return dr
@@ -2990,8 +2990,11 @@ class DAG(BaseDag, LoggingMixin):
             self, start_date, end_date, state=State.RUNNING, session=None):
         dates = utils_date_range(start_date, end_date)
         drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all()
+        dirty_ids = []
         for dr in drs:
             dr.state = state
+            dirty_ids.append(dr.dag_id)
+        DagStat.clean_dirty(dirty_ids, session=session)
 
     def clear(
             self, start_date=None, end_date=None,
@@ -3248,8 +3251,8 @@ class DAG(BaseDag, LoggingMixin):
     @provide_session
     def create_dagrun(self,
                       run_id,
-                      execution_date,
                       state,
+                      execution_date=None,
                       start_date=None,
                       external_trigger=False,
                       conf=None,
@@ -3290,6 +3293,12 @@ class DAG(BaseDag, LoggingMixin):
         run.verify_integrity(session=session)
 
         run.refresh_from_db()
+        DagStat.set_dirty(self.dag_id, session=session)
+
+        # add a placeholder row into DagStat table
+        if not session.query(DagStat).filter(DagStat.dag_id == self.dag_id).first():
+            session.add(DagStat(dag_id=self.dag_id, state=State.RUNNING, count=0, dirty=True))
+        session.commit()
         return run
 
     @staticmethod
@@ -3626,6 +3635,62 @@ class XCom(Base):
         session.commit()
 
 
+class DagStat(Base):
+    __tablename__ = "dag_stats"
+
+    dag_id = Column(String(ID_LEN), primary_key=True)
+    state = Column(String(50), primary_key=True)
+    count = Column(Integer, default=0)
+    dirty = Column(Boolean, default=False)
+
+    def __init__(self, dag_id, state, count, dirty=False):
+        self.dag_id = dag_id
+        self.state = state
+        self.count = count
+        self.dirty = dirty
+
+    @staticmethod
+    @provide_session
+    def set_dirty(dag_id, session=None):
+        for dag in session.query(DagStat).filter(DagStat.dag_id == dag_id):
+            dag.dirty = True
+        session.commit()
+
+    @staticmethod
+    @provide_session
+    def clean_dirty(dag_ids, session=None):
+        """
+        Cleans out the dirty/out-of-sync rows from dag_stats table
+
+        :param dag_ids: dag_ids that may be dirty
+        :type dag_ids: list
+        :param full_query: whether to check dag_runs for new drs not in dag_stats
+        :type full_query: bool
+        """
+        dag_ids = set(dag_ids)
+        ds_ids = set(session.query(DagStat.dag_id).all())
+
+        qry = (
+            session.query(DagStat)
+            .filter(and_(DagStat.dag_id.in_(dag_ids), DagStat.dirty == True))
+        )
+
+        dirty_ids = {dag.dag_id for dag in qry.all()}
+        qry.delete(synchronize_session='fetch')
+        session.commit()
+
+        qry = (
+            session.query(DagRun.dag_id, DagRun.state, func.count('*'))
+            .filter(DagRun.dag_id.in_(dirty_ids))
+            .group_by(DagRun.dag_id, DagRun.state)
+        )
+
+        for dag_id, state, count in qry:
+            session.add(DagStat(dag_id=dag_id, state=state, count=count))
+
+        session.commit()
+
+
 class DagRun(Base):
     """
     DagRun describes an instance of a Dag. It can be created
@@ -3641,7 +3706,7 @@ class DagRun(Base):
     execution_date = Column(DateTime, default=func.now())
     start_date = Column(DateTime, default=func.now())
     end_date = Column(DateTime)
-    state = Column(String(50), default=State.RUNNING)
+    _state = Column('state', String(50), default=State.RUNNING)
     run_id = Column(String(ID_LEN))
     external_trigger = Column(Boolean, default=True)
     conf = Column(PickleType)
@@ -3662,6 +3727,20 @@ class DagRun(Base):
             run_id=self.run_id,
             external_trigger=self.external_trigger)
 
+    def get_state(self):
+        return self._state
+
+    def set_state(self, state):
+        if self._state != state:
+            self._state = state
+            session = settings.Session()
+            DagStat.set_dirty(self.dag_id, session=session)
+
+    @declared_attr
+    def state(self):
+        return synonym('_state',
+                       descriptor=property(self.get_state, self.set_state))
+
     @classmethod
     def id_for_date(cls, date, prefix=ID_FORMAT_PREFIX):
         return prefix.format(date.isoformat()[:19])
@@ -3678,7 +3757,7 @@ class DagRun(Base):
             DR.dag_id == self.dag_id,
             DR.execution_date == self.execution_date,
             DR.run_id == self.run_id
-        ).one()
+        ).first()
         if dr:
             self.id = dr.id
             self.state = dr.state
@@ -3802,7 +3881,6 @@ class DagRun(Base):
             session=session
         )
         none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
-
         # small speed up
         if unfinished_tasks and none_depends_on_past:
             # todo: this can actually get pretty slow: one task costs between 0.01-015s

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index d514acd..239ebb4 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -14,10 +14,13 @@
 
 from datetime import datetime
 import logging
+import os
 
-from airflow.models import BaseOperator, DagRun
+from airflow.models import BaseOperator, DagBag
 from airflow.utils.decorators import apply_defaults
+from airflow.utils.state import State
 from airflow import settings
+from airflow import configuration as conf
 
 
 class DagRunOrder(object):
@@ -62,9 +65,11 @@ class TriggerDagRunOperator(BaseOperator):
         dro = self.python_callable(context, dro)
         if dro:
             session = settings.Session()
-            dr = DagRun(
-                dag_id=self.trigger_dag_id,
+            dbag = DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+            trigger_dag = dbag.get_dag(self.trigger_dag_id)
+            dr = trigger_dag.create_dagrun(
                 run_id=dro.run_id,
+                state=State.RUNNING,
                 conf=dro.payload,
                 external_trigger=True)
             logging.info("Creating DagRun {}".format(dr))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/utils/state.py
----------------------------------------------------------------------
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 6ef158e..b650964 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -41,6 +41,21 @@ class State(object):
     UPSTREAM_FAILED = "upstream_failed"
     SKIPPED = "skipped"
 
+    task_states = (
+        SUCCESS,
+        RUNNING,
+        FAILED,
+        UPSTREAM_FAILED,
+        UP_FOR_RETRY,
+        QUEUED,
+    )
+
+    dag_states = (
+        SUCCESS,
+        RUNNING,
+        FAILED,
+    )
+
     state_color = {
         QUEUED: 'gray',
         RUNNING: 'lime',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/www/templates/airflow/dags.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 90ad70a..f189d92 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -37,10 +37,14 @@
                 <th>DAG</th>
                 <th>Schedule</th>
                 <th>Owner</th>
-                <th style="padding-left: 5px;">Recent Statuses
+                <th style="padding-left: 5px;">Recent Tasks
                   <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true"
title="Status of tasks from all active DAG runs or, if not currently active, from most recent
run."></span>
                   <img id="loading" width="15" src="{{ url_for("static", filename="loading.gif")
}}">
                 </th>
+                <th style="padding-left: 5px;">DAG Runs
+                  <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true"
title="Status of all previous DAG runs."></span>
+                  <img id="loading" width="15" src="{{ url_for("static", filename="loading.gif")
}}">
+                </th>
                 <th class="text-center">Links</th>
             </tr>
         </thead>
@@ -93,12 +97,17 @@
                   {{ dag.owner if dag else orm_dags[dag_id].owners }}
                 </td>
 
-                <!-- Column 6: Recent Statuses -->
+                <!-- Column 6: Recent Tasks -->
                 <td style="padding:0px; width:200px; height:10px;">
-                    <svg height="10" width="10" id='dag-{{ dag.safe_dag_id }}' style="display:
block;"></svg>
+                    <svg height="10" width="10" id='task-run-{{ dag.safe_dag_id }}' style="display:
block;"></svg>
                 </td>
 
-                <!-- Column 7: Links -->
+                <!-- Column 7: Dag Runs -->
+                <td style="padding:0px; width:120px; height:10px;">
+                    <svg height="10" width="10" id='dag-run-{{ dag.safe_dag_id }}' style="display:
block;"></svg>
+                </td>
+
+                <!-- Column 8: Links -->
                 <td class="text-center" style="display:flex; flex-direction:row; justify-content:space-around;">
                 {% if dag %}
 
@@ -198,7 +207,86 @@
       d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
         for(var dag_id in json) {
             states = json[dag_id];
-            g = d3.select('svg#dag-' + dag_id)
+            g = d3.select('svg#dag-run-' + dag_id)
+              .attr('height', diameter + (stroke_width_hover * 2))
+              .attr('width', '110px')
+              .selectAll("g")
+              .data(states)
+              .enter()
+              .append('g')
+              .attr('transform', function(d, i) {
+                x = (i * (diameter + circle_margin)) + (diameter/2 + circle_margin);
+                y = (diameter/2) + stroke_width_hover;
+                return 'translate(' + x + ',' + y + ')';
+              });
+
+            g.append('text')
+              .attr('fill', 'black')
+              .attr('text-anchor', 'middle')
+              .attr('vertical-align', 'middle')
+              .attr('font-size', 8)
+              .attr('y', 3)
+              .text(function(d){ return d.count > 0 ? d.count : ''; });
+
+            g.append('circle')
+              .attr('stroke-width', function(d) {
+                  if (d.count > 0)
+                    return stroke_width;
+                  else {
+                    return 1;
+                  }
+              })
+              .attr('stroke', function(d) {
+                  if (d.count > 0)
+                    return d.color;
+                  else {
+                    return 'gainsboro';
+                  }
+              })
+              .attr('fill-opacity', 0)
+              .attr('r', diameter/2)
+              .attr('title', function(d) {return d.state})
+              .attr('style', function(d) {
+                if (d.count > 0)
+                    return"cursor:pointer;"
+              })
+              .on('click', function(d, i) {
+                  if (d.count > 0)
+                    window.location = "/admin/dagrun/?flt1_dag_id_equals=" + d.dag_id + "&flt2_state_equals="
+ d.state;
+              })
+              .on('mouseover', function(d, i) {
+                if (d.count > 0) {
+                    d3.select(this).transition().duration(400)
+                      .attr('fill-opacity', 0.3)
+                      .style("stroke-width", stroke_width_hover);
+                }
+              })
+              .on('mouseout', function(d, i) {
+                if (d.count > 0) {
+                    d3.select(this).transition().duration(400)
+                      .attr('fill-opacity', 0)
+                      .style("stroke-width", stroke_width);
+                }
+              })
+              .style("opacity", 0)
+              .transition()
+              .duration(500)
+              .delay(function(d, i){return i*50;})
+              .style("opacity", 1);
+            d3.select("#loading").remove();
+        }
+        $("#pause_header").tooltip();
+        $("#statuses_info").tooltip();
+
+        $("circle").tooltip({
+          html: true,
+          container: "body",
+        });
+      });
+      d3.json("{{ url_for('airflow.task_stats') }}", function(error, json) {
+        for(var dag_id in json) {
+            states = json[dag_id];
+            g = d3.select('svg#task-run-' + dag_id)
               .attr('height', diameter + (stroke_width_hover * 2))
               .attr('width', '180px')
               .selectAll("g")
@@ -273,6 +361,6 @@
           html: true,
           container: "body",
         });
-    });
+      });
   </script>
 {% endblock %}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 08edd08..28d17ea 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -463,16 +463,39 @@ class Airflow(BaseView):
             embed=embed)
 
     @expose('/dag_stats')
-    #@login_required
     def dag_stats(self):
-        states = [
-            State.SUCCESS,
-            State.RUNNING,
-            State.FAILED,
-            State.UPSTREAM_FAILED,
-            State.UP_FOR_RETRY,
-            State.QUEUED,
-        ]
+        ds = models.DagStat
+        session = Session()
+
+        qry = (
+            session.query(ds.dag_id, ds.state, ds.count)
+        )
+
+        data = {}
+        for dag_id, state, count in qry:
+            if dag_id not in data:
+                data[dag_id] = {}
+            data[dag_id][state] = count
+
+        payload = {}
+        for dag in dagbag.dags.values():
+            payload[dag.safe_dag_id] = []
+            for state in State.dag_states:
+                try:
+                    count = data[dag.dag_id][state]
+                except Exception:
+                    count = 0
+                d = {
+                    'state': state,
+                    'count': count,
+                    'dag_id': dag.dag_id,
+                    'color': State.color(state)
+                }
+                payload[dag.safe_dag_id].append(d)
+        return wwwutils.json_response(payload)
+
+    @expose('/task_stats')
+    def task_stats(self):
         task_ids = []
         dag_ids = []
         for dag in dagbag.dags.values():
@@ -531,7 +554,7 @@ class Airflow(BaseView):
         payload = {}
         for dag in dagbag.dags.values():
             payload[dag.safe_dag_id] = []
-            for state in states:
+            for state in State.task_states:
                 try:
                     count = data[dag.dag_id][state]
                 except:
@@ -2110,7 +2133,6 @@ class JobModelView(ModelViewOnly):
 
 class DagRunModelView(ModelViewOnly):
     verbose_name_plural = "DAG Runs"
-    can_delete = True
     can_edit = True
     can_create = True
     column_editable_list = ('state',)
@@ -2136,6 +2158,23 @@ class DagRunModelView(ModelViewOnly):
         start_date=datetime_f,
         dag_id=dag_link)
 
+    @action('new_delete', "Delete", "Are you sure you want to delete selected records?")
+    def action_new_delete(self, ids):
+        session = settings.Session()
+        deleted = set(session.query(models.DagRun)
+            .filter(models.DagRun.id.in_(ids))
+            .all())
+        session.query(models.DagRun)\
+            .filter(models.DagRun.id.in_(ids))\
+            .delete(synchronize_session='fetch')
+        session.commit()
+        dirty_ids = []
+        for row in deleted:
+            models.DagStat.set_dirty(row.dag_id, session=session)
+            dirty_ids.append(row.dag_id)
+        models.DagStat.clean_dirty(dirty_ids, session=session)
+        session.close()
+
     @action('set_running', "Set state to 'running'", None)
     def action_set_running(self, ids):
         self.set_dagrun_state(ids, State.RUNNING)
@@ -2153,7 +2192,9 @@ class DagRunModelView(ModelViewOnly):
         try:
             DR = models.DagRun
             count = 0
+            dirty_ids = []
             for dr in session.query(DR).filter(DR.id.in_(ids)).all():
+                dirty_ids.append(dr.dag_id)
                 count += 1
                 dr.state = target_state
                 if target_state == State.RUNNING:
@@ -2161,6 +2202,7 @@ class DagRunModelView(ModelViewOnly):
                 else:
                     dr.end_date = datetime.now()
             session.commit()
+            models.DagStat.clean_dirty(dirty_ids, session=session)
             flash(
                 "{count} dag runs were set to '{target_state}'".format(**locals()))
         except Exception as ex:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38972162/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 98f0e06..5582066 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -833,6 +833,69 @@ class CoreTest(unittest.TestCase):
         # C
         assert sum([f.duration for f in f_fails]) >= 3
 
+    def test_dag_stats(self):
+        """Correctly sets/dirties/cleans rows of DagStat table"""
+
+        session = settings.Session()
+
+        session.query(models.DagRun).delete()
+        session.query(models.DagStat).delete()
+        session.commit()
+
+        run1 = self.dag_bash.create_dagrun(
+            run_id="run1",
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING)
+
+        models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
+
+        qry = session.query(models.DagStat).all()
+
+        assert len(qry) == 1
+        assert qry[0].dag_id == self.dag_bash.dag_id and\
+                qry[0].state == State.RUNNING and\
+                qry[0].count == 1 and\
+                qry[0].dirty == False
+
+        run2 = self.dag_bash.create_dagrun(
+            run_id="run2",
+            execution_date=DEFAULT_DATE+timedelta(days=1),
+            state=State.RUNNING)
+
+        models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
+
+        qry = session.query(models.DagStat).all()
+
+        assert len(qry) == 1
+        assert qry[0].dag_id == self.dag_bash.dag_id and\
+                qry[0].state == State.RUNNING and\
+                qry[0].count == 2 and\
+                qry[0].dirty == False
+
+        session.query(models.DagRun).first().state = State.SUCCESS
+        session.commit()
+
+        models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
+
+        qry = session.query(models.DagStat).filter(models.DagStat.state == State.SUCCESS).all()
+        assert len(qry) == 1
+        assert qry[0].dag_id == self.dag_bash.dag_id and\
+                qry[0].state == State.SUCCESS and\
+                qry[0].count == 1 and\
+                qry[0].dirty == False
+
+        qry = session.query(models.DagStat).filter(models.DagStat.state == State.RUNNING).all()
+        assert len(qry) == 1
+        assert qry[0].dag_id == self.dag_bash.dag_id and\
+                qry[0].state == State.RUNNING and\
+                qry[0].count == 1 and\
+                qry[0].dirty == False
+
+        session.query(models.DagRun).delete()
+        session.query(models.DagStat).delete()
+        session.commit()
+        session.close()
+
 
 class CliTests(unittest.TestCase):
     def setUp(self):
@@ -1078,6 +1141,9 @@ class WebUiTests(unittest.TestCase):
         response = self.app.get(
             '/admin/airflow/dag_stats')
         assert "example_bash_operator" in response.data.decode('utf-8')
+        response = self.app.get(
+            '/admin/airflow/task_stats')
+        assert "example_bash_operator" in response.data.decode('utf-8')
         url = (
             "/admin/airflow/success?task_id=run_this_last&"
             "dag_id=example_bash_operator&upstream=false&downstream=false&"


Mime
View raw message