airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [8/9] incubator-airflow git commit: [AIRFLOW-6] Remove dependency on Highcharts
Date Mon, 20 Jun 2016 13:37:38 GMT
[AIRFLOW-6] Remove dependency on Highcharts

Highcharts' license is not compatible with the Apache 2.0
license. This patch removes Highcharts in favor of d3,
however some charts are not supported anymore.

* This brings Maxime Beauchemin's work to master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a460081
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a460081
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a460081

Branch: refs/heads/master
Commit: 0a460081bc7cba2d05434148f092b87d35aa8cd3
Parents: d243c00
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Mon Jun 20 14:19:34 2016 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Mon Jun 20 14:53:30 2016 +0200

----------------------------------------------------------------------
 `                                            |  2347 ++++
 airflow/bin/airflow                          |    13 +
 airflow/executors/base_executor.py           |    14 +
 airflow/executors/celery_executor.py         |    14 +
 airflow/hooks/__init__.py                    |     2 +-
 airflow/hooks/dbapi_hook.py                  |     1 -
 airflow/hooks/jdbc_hook.py                   |     3 +-
 airflow/hooks/oracle_hook.py                 |     1 +
 airflow/www/app.py                           |     4 +-
 airflow/www/blueprints.py                    |     6 -
 airflow/www/static/d3.tip.v0.6.3.js          |   280 +
 airflow/www/static/d3.v3.min.js              |    10 +-
 airflow/www/static/gantt-chart-d3v2.js       |   247 +
 airflow/www/static/gantt.css                 |    38 +
 airflow/www/static/highcharts-more.js        |    53 -
 airflow/www/static/highcharts.js             |   308 -
 airflow/www/static/nv.d3.css                 |   769 ++
 airflow/www/static/nv.d3.js                  | 14241 ++++++++++++++++++++
 airflow/www/static/nvd3.tar.gz               |   Bin 0 -> 328377 bytes
 airflow/www/templates/airflow/chart.html     |    37 +-
 airflow/www/templates/airflow/dag.html       |     4 +-
 airflow/www/templates/airflow/gantt.html     |    86 +-
 airflow/www/templates/airflow/highchart.html |   183 -
 airflow/www/templates/airflow/nvd3.html      |   175 +
 airflow/www/utils.py                         |    11 +-
 airflow/www/views.py                         |   352 +-
 setup.py                                     |    23 +-
 tests/core.py                                |    19 -
 28 files changed, 18315 insertions(+), 926 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/`
----------------------------------------------------------------------
diff --git a/` b/`
new file mode 100644
index 0000000..6331805
--- /dev/null
+++ b/`
@@ -0,0 +1,2347 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+
+import os
+import socket
+import importlib
+
+from functools import wraps
+from datetime import datetime, timedelta
+import dateutil.parser
+import copy
+from itertools import chain, product
+
+from past.utils import old_div
+from past.builtins import basestring
+
+import inspect
+import traceback
+
+import sqlalchemy as sqla
+from sqlalchemy import or_, desc, and_
+
+
+from flask import redirect, url_for, request, Markup, Response, current_app, render_template
+from flask_admin import BaseView, expose, AdminIndexView
+from flask_admin.contrib.sqla import ModelView
+from flask_admin.actions import action
+from flask_login import flash
+from flask._compat import PY2
+
+import jinja2
+import markdown
+import json
+
+from wtforms import (
+    Form, SelectField, TextAreaField, PasswordField, StringField)
+
+from pygments import highlight, lexers
+from pygments.formatters import HtmlFormatter
+
+import airflow
+from airflow import configuration as conf
+from airflow import models
+from airflow import settings
+from airflow.exceptions import AirflowException
+from airflow.settings import Session
+from airflow.models import XCom
+
+from airflow.utils.json import json_ser
+from airflow.utils.state import State
+from airflow.utils.db import provide_session
+from airflow.utils.helpers import alchemy_to_dict
+from airflow.utils import logging as log_utils
+from airflow.www import utils as wwwutils
+from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
+
+QUERY_LIMIT = 100000
+CHART_LIMIT = 200000
+
+dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+
+login_required = airflow.login.login_required
+current_user = airflow.login.current_user
+logout_user = airflow.login.logout_user
+
+FILTER_BY_OWNER = False
+if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
+    # filter_by_owner if authentication is enabled and filter_by_owner is true
+    FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
+
+
+def dag_link(v, c, m, p):
+    url = url_for(
+        'airflow.graph',
+        dag_id=m.dag_id)
+    return Markup(
+        '<a href="{url}">{m.dag_id}</a>'.format(**locals()))
+
+
+def log_link(v, c, m, p):
+    url = url_for(
+        'airflow.log',
+        dag_id=m.dag_id,
+        task_id=m.task_id,
+        execution_date=m.execution_date.isoformat())
+    return Markup(
+        '<a href="{url}">'
+        '    <span class="glyphicon glyphicon-book" aria-hidden="true">'
+        '</span></a>').format(**locals())
+
+
+def task_instance_link(v, c, m, p):
+    url = url_for(
+        'airflow.task',
+        dag_id=m.dag_id,
+        task_id=m.task_id,
+        execution_date=m.execution_date.isoformat())
+    url_root = url_for(
+        'airflow.graph',
+        dag_id=m.dag_id,
+        root=m.task_id,
+        execution_date=m.execution_date.isoformat())
+    return Markup(
+        """
+        <span style="white-space: nowrap;">
+        <a href="{url}">{m.task_id}</a>
+        <a href="{url_root}" title="Filter on this task and upstream">
+        <span class="glyphicon glyphicon-filter" style="margin-left: 0px;"
+            aria-hidden="true"></span>
+        </a>
+        </span>
+        """.format(**locals()))
+
+
+def state_token(state):
+    color = State.color(state)
+    return Markup(
+        '<span class="label" style="background-color:{color};">'
+        '{state}</span>'.format(**locals()))
+
+
+def state_f(v, c, m, p):
+    return state_token(m.state)
+
+
+def duration_f(v, c, m, p):
+    if m.end_date and m.duration:
+        return timedelta(seconds=m.duration)
+
+
+def datetime_f(v, c, m, p):
+    attr = getattr(m, p)
+    dttm = attr.isoformat() if attr else ''
+    if datetime.now().isoformat()[:4] == dttm[:4]:
+        dttm = dttm[5:]
+    return Markup("<nobr>{}</nobr>".format(dttm))
+
+
+def nobr_f(v, c, m, p):
+    return Markup("<nobr>{}</nobr>".format(getattr(m, p)))
+
+
+def label_link(v, c, m, p):
+    try:
+        default_params = eval(m.default_params)
+    except:
+        default_params = {}
+    url = url_for(
+        'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no,
+        **default_params)
+    return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
+
+
+def pool_link(v, c, m, p):
+    url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool
+    return Markup("<a href='{url}'>{m.pool}</a>".format(**locals()))
+
+
+def pygment_html_render(s, lexer=lexers.TextLexer):
+    return highlight(
+        s,
+        lexer(),
+        HtmlFormatter(linenos=True),
+    )
+
+
+def render(obj, lexer):
+    out = ""
+    if isinstance(obj, basestring):
+        out += pygment_html_render(obj, lexer)
+    elif isinstance(obj, (tuple, list)):
+        for i, s in enumerate(obj):
+            out += "<div>List item #{}</div>".format(i)
+            out += "<div>" + pygment_html_render(s, lexer) + "</div>"
+    elif isinstance(obj, dict):
+        for k, v in obj.items():
+            out += '<div>Dict item "{}"</div>'.format(k)
+            out += "<div>" + pygment_html_render(v, lexer) + "</div>"
+    return out
+
+
+def wrapped_markdown(s):
+    return '<div class="rich_doc">' + markdown.markdown(s) + "</div>"
+
+
+attr_renderer = {
+    'bash_command': lambda x: render(x, lexers.BashLexer),
+    'hql': lambda x: render(x, lexers.SqlLexer),
+    'sql': lambda x: render(x, lexers.SqlLexer),
+    'doc': lambda x: render(x, lexers.TextLexer),
+    'doc_json': lambda x: render(x, lexers.JsonLexer),
+    'doc_rst': lambda x: render(x, lexers.RstLexer),
+    'doc_yaml': lambda x: render(x, lexers.YamlLexer),
+    'doc_md': wrapped_markdown,
+    'python_callable': lambda x: render(
+        inspect.getsource(x), lexers.PythonLexer),
+}
+
+
+def data_profiling_required(f):
+    '''
+    Decorator for views requiring data profiling access
+    '''
+    @wraps(f)
+    def decorated_function(*args, **kwargs):
+        if (
+                    current_app.config['LOGIN_DISABLED'] or
+                    (not current_user.is_anonymous() and current_user.data_profiling())
+        ):
+            return f(*args, **kwargs)
+        else:
+            flash("This page requires data profiling privileges", "error")
+            return redirect(url_for('admin.index'))
+    return decorated_function
+
+
+def fused_slots(v, c, m, p):
+    url = (
+        '/admin/taskinstance/' +
+        '?flt1_pool_equals=' + m.pool +
+        '&flt2_state_equals=running')
+    return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots()))
+
+
+def fqueued_slots(v, c, m, p):
+    url = (
+        '/admin/taskinstance/' +
+        '?flt1_pool_equals=' + m.pool +
+        '&flt2_state_equals=queued&sort=10&desc=1')
+    return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots()))
+
+
+class Airflow(BaseView):
+
+    def is_visible(self):
+        return False
+
+    @expose('/')
+    @login_required
+    def index(self):
+        return self.render('airflow/dags.html')
+
+    @expose('/chart_data')
+    @data_profiling_required
+    @wwwutils.gzipped
+    # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key)
+    def chart_data(self):
+        session = settings.Session()
+        chart_id = request.args.get('chart_id')
+        csv = request.args.get('csv') == "true"
+        chart = session.query(models.Chart).filter_by(id=chart_id).first()
+        db = session.query(
+            models.Connection).filter_by(conn_id=chart.conn_id).first()
+        session.expunge_all()
+        session.commit()
+        session.close()
+
+        payload = {}
+        payload['state'] = 'ERROR'
+        payload['error'] = ''
+
+        # Processing templated fields
+        try:
+            args = eval(chart.default_params)
+            if type(args) is not type(dict()):
+                raise AirflowException('Not a dict')
+        except:
+            args = {}
+            payload['error'] += (
+                "Default params is not valid, string has to evaluate as "
+                "a Python dictionary. ")
+
+        request_dict = {k: request.args.get(k) for k in request.args}
+        from airflow import macros
+        args.update(request_dict)
+        args['macros'] = macros
+        sql = jinja2.Template(chart.sql).render(**args)
+        label = jinja2.Template(chart.label).render(**args)
+        payload['sql_html'] = Markup(highlight(
+            sql,
+            lexers.SqlLexer(),  # Lexer call
+            HtmlFormatter(noclasses=True))
+        )
+        payload['label'] = label
+
+        import pandas as pd
+        pd.set_option('display.max_colwidth', 100)
+        hook = db.get_hook()
+        try:
+            df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type))
+            df = df.fillna(0)
+        except Exception as e:
+            payload['error'] += "SQL execution failed. Details: " + str(e)
+
+        if csv:
+            return Response(
+                response=df.to_csv(index=False),
+                status=200,
+                mimetype="application/text")
+
+        if not payload['error'] and len(df) == CHART_LIMIT:
+            payload['warning'] = (
+                "Data has been truncated to {0}"
+                " rows. Expect incomplete results.").format(CHART_LIMIT)
+
+        if not payload['error'] and len(df) == 0:
+            payload['error'] += "Empty result set. "
+        elif (
+                            not payload['error'] and
+                                chart.sql_layout == 'series' and
+                            chart.chart_type != "datatable" and
+                        len(df.columns) < 3):
+            payload['error'] += "SQL needs to return at least 3 columns. "
+        elif (
+                        not payload['error'] and
+                            chart.sql_layout == 'columns'and
+                        len(df.columns) < 2):
+            payload['error'] += "SQL needs to return at least 2 columns. "
+        elif not payload['error']:
+            import numpy as np
+            chart_type = chart.chart_type
+
+            data = None
+            if chart_type == "datatable":
+                chart.show_datatable = True
+            if chart.show_datatable:
+                data = df.to_dict(orient="split")
+                data['columns'] = [{'title': c} for c in data['columns']]
+
+            # Trying to convert time to something Highcharts likes
+            x_col = 1 if chart.sql_layout == 'series' else 0
+            if chart.x_is_date:
+                try:
+                    # From string to datetime
+                    df[df.columns[x_col]] = pd.to_datetime(
+                        df[df.columns[x_col]])
+                except Exception as e:
+                    raise AirflowException(str(e))
+                df[df.columns[x_col]] = df[df.columns[x_col]].apply(
+                    lambda x: int(x.strftime("%s")) * 1000)
+
+            series = []
+            colorAxis = None
+            if chart_type == 'datatable':
+                payload['data'] = data
+                payload['state'] = 'SUCCESS'
+                return wwwutils.json_response(payload)
+
+            elif chart_type == 'para':
+                df.rename(columns={
+                    df.columns[0]: 'name',
+                    df.columns[1]: 'group',
+                }, inplace=True)
+                return Response(
+                    response=df.to_csv(index=False),
+                    status=200,
+                    mimetype="application/text")
+
+            elif chart_type == 'heatmap':
+                color_perc_lbound = float(
+                    request.args.get('color_perc_lbound', 0))
+                color_perc_rbound = float(
+                    request.args.get('color_perc_rbound', 1))
+                color_scheme = request.args.get('color_scheme', 'blue_red')
+
+                if color_scheme == 'blue_red':
+                    stops = [
+                        [color_perc_lbound, '#00D1C1'],
+                        [
+                            color_perc_lbound +
+                            ((color_perc_rbound - color_perc_lbound)/2),
+                            '#FFFFCC'
+                        ],
+                        [color_perc_rbound, '#FF5A5F']
+                    ]
+                elif color_scheme == 'blue_scale':
+                    stops = [
+                        [color_perc_lbound, '#FFFFFF'],
+                        [color_perc_rbound, '#2222FF']
+                    ]
+                elif color_scheme == 'fire':
+                    diff = float(color_perc_rbound - color_perc_lbound)
+                    stops = [
+                        [color_perc_lbound, '#FFFFFF'],
+                        [color_perc_lbound + 0.33*diff, '#FFFF00'],
+                        [color_perc_lbound + 0.66*diff, '#FF0000'],
+                        [color_perc_rbound, '#000000']
+                    ]
+                else:
+                    stops = [
+                        [color_perc_lbound, '#FFFFFF'],
+                        [
+                            color_perc_lbound +
+                            ((color_perc_rbound - color_perc_lbound)/2),
+                            '#888888'
+                        ],
+                        [color_perc_rbound, '#000000'],
+                    ]
+
+                xaxis_label = df.columns[1]
+                yaxis_label = df.columns[2]
+                data = []
+                for row in df.itertuples():
+                    data.append({
+                        'x': row[2],
+                        'y': row[3],
+                        'value': row[4],
+                    })
+                x_format = '{point.x:%Y-%m-%d}' \
+                    if chart.x_is_date else '{point.x}'
+                series.append({
+                    'data': data,
+                    'borderWidth': 0,
+                    'colsize': 24 * 36e5,
+                    'turboThreshold': sys.float_info.max,
+                    'tooltip': {
+                        'headerFormat': '',
+                        'pointFormat': (
+                            df.columns[1] + ': ' + x_format + '<br/>' +
+                            df.columns[2] + ': {point.y}<br/>' +
+                            df.columns[3] + ': <b>{point.value}</b>'
+                        ),
+                    },
+                })
+                colorAxis = {
+                    'stops': stops,
+                    'minColor': '#FFFFFF',
+                    'maxColor': '#000000',
+                    'min': 50,
+                    'max': 2200,
+                }
+            else:
+                if chart.sql_layout == 'series':
+                    # User provides columns (series, x, y)
+                    xaxis_label = df.columns[1]
+                    yaxis_label = df.columns[2]
+                    df[df.columns[2]] = df[df.columns[2]].astype(np.float)
+                    df = df.pivot_table(
+                        index=df.columns[1],
+                        columns=df.columns[0],
+                        values=df.columns[2], aggfunc=np.sum)
+                else:
+                    # User provides columns (x, y, metric1, metric2, ...)
+                    xaxis_label = df.columns[0]
+                    yaxis_label = 'y'
+                    df.index = df[df.columns[0]]
+                    df = df.sort(df.columns[0])
+                    del df[df.columns[0]]
+                    for col in df.columns:
+                        df[col] = df[col].astype(np.float)
+
+                for col in df.columns:
+                    series.append({
+                        'name': col,
+                        'data': [
+                            (k, df[col][k])
+                            for k in df[col].keys()
+                            if not np.isnan(df[col][k])]
+                    })
+                series = [serie for serie in sorted(
+                    series, key=lambda s: s['data'][0][1], reverse=True)]
+
+            if chart_type == "stacked_area":
+                stacking = "normal"
+                chart_type = 'area'
+            elif chart_type == "percent_area":
+                stacking = "percent"
+                chart_type = 'area'
+            else:
+                stacking = None
+            hc = {
+                'chart': {
+                    'type': chart_type
+                },
+                'plotOptions': {
+                    'series': {
+                        'marker': {
+                            'enabled': False
+                        }
+                    },
+                    'area': {'stacking': stacking},
+                },
+                'title': {'text': ''},
+                'xAxis': {
+                    'title': {'text': xaxis_label},
+                    'type': 'datetime' if chart.x_is_date else None,
+                },
+                'yAxis': {
+                    'title': {'text': yaxis_label},
+                },
+                'colorAxis': colorAxis,
+                'tooltip': {
+                    'useHTML': True,
+                    'backgroundColor': None,
+                    'borderWidth': 0,
+                },
+                'series': series,
+            }
+
+            if chart.y_log_scale:
+                hc['yAxis']['type'] = 'logarithmic'
+                hc['yAxis']['minorTickInterval'] = 0.1
+                if 'min' in hc['yAxis']:
+                    del hc['yAxis']['min']
+
+            payload['state'] = 'SUCCESS'
+            payload['hc'] = hc
+            payload['data'] = data
+            payload['request_dict'] = request_dict
+        return wwwutils.json_response(payload)
+
+    @expose('/chart')
+    @data_profiling_required
+    def chart(self):
+        session = settings.Session()
+        chart_id = request.args.get('chart_id')
+        embed = request.args.get('embed')
+        chart = session.query(models.Chart).filter_by(id=chart_id).first()
+        session.expunge_all()
+        session.commit()
+        session.close()
+        if chart.chart_type == 'para':
+            return self.render('airflow/para/para.html', chart=chart)
+
+        sql = ""
+        if chart.show_sql:
+            sql = Markup(highlight(
+                chart.sql,
+                lexers.SqlLexer(),  # Lexer call
+                HtmlFormatter(noclasses=True))
+            )
+        return self.render(
+            'airflow/highchart.html',
+            chart=chart,
+            title="Airflow - Chart",
+            sql=sql,
+            label=chart.label,
+            embed=embed)
+
+    @expose('/dag_stats')
+    #@login_required
+    def dag_stats(self):
+        states = [
+            State.SUCCESS,
+            State.RUNNING,
+            State.FAILED,
+            State.UPSTREAM_FAILED,
+            State.UP_FOR_RETRY,
+            State.QUEUED,
+        ]
+        task_ids = []
+        dag_ids = []
+        for dag in dagbag.dags.values():
+            task_ids += dag.task_ids
+            if not dag.is_subdag:
+                dag_ids.append(dag.dag_id)
+
+        TI = models.TaskInstance
+        DagRun = models.DagRun
+        session = Session()
+
+        LastDagRun = (
+            session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
+            .group_by(DagRun.dag_id)
+            .subquery('last_dag_run')
+        )
+        RunningDagRun = (
+            session.query(DagRun.dag_id, DagRun.execution_date)
+            .filter(DagRun.state == State.RUNNING)
+            .subquery('running_dag_run')
+        )
+
+        # Select all task_instances from active dag_runs.
+        # If no dag_run is active, return task instances from most recent dag_run.
+        qry = (
+            session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
+            .outerjoin(RunningDagRun, and_(
+                RunningDagRun.c.dag_id == TI.dag_id,
+                RunningDagRun.c.execution_date == TI.execution_date)
+            )
+            .outerjoin(LastDagRun, and_(
+                LastDagRun.c.dag_id == TI.dag_id,
+                LastDagRun.c.execution_date == TI.execution_date)
+            )
+            .filter(TI.task_id.in_(task_ids))
+            .filter(TI.dag_id.in_(dag_ids))
+            .filter(or_(
+                RunningDagRun.c.dag_id != None,
+                LastDagRun.c.dag_id != None
+            ))
+            .group_by(TI.dag_id, TI.state)
+        )
+
+        data = {}
+        for dag_id, state, count in qry:
+            if dag_id not in data:
+                data[dag_id] = {}
+            data[dag_id][state] = count
+        session.commit()
+        session.close()
+
+        payload = {}
+        for dag in dagbag.dags.values():
+            payload[dag.safe_dag_id] = []
+            for state in states:
+                try:
+                    count = data[dag.dag_id][state]
+                except:
+                    count = 0
+                d = {
+                    'state': state,
+                    'count': count,
+                    'dag_id': dag.dag_id,
+                    'color': State.color(state)
+                }
+                payload[dag.safe_dag_id].append(d)
+        return wwwutils.json_response(payload)
+
+
+    @expose('/code')
+    @login_required
+    def code(self):
+        dag_id = request.args.get('dag_id')
+        dag = dagbag.get_dag(dag_id)
+        title = dag_id
+        try:
+            m = importlib.import_module(dag.module_name)
+            code = inspect.getsource(m)
+            html_code = highlight(
+                code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
+        except IOError as e:
+            html_code = str(e)
+
+        return self.render(
+            'airflow/dag_code.html', html_code=html_code, dag=dag, title=title,
+            root=request.args.get('root'),
+            demo_mode=conf.getboolean('webserver', 'demo_mode'))
+
+    @expose('/dag_details')
+    @login_required
+    def dag_details(self):
+        dag_id = request.args.get('dag_id')
+        dag = dagbag.get_dag(dag_id)
+        title = "DAG details"
+
+        session = settings.Session()
+        TI = models.TaskInstance
+        states = (
+            session.query(TI.state, sqla.func.count(TI.dag_id))
+            .filter(TI.dag_id == dag_id)
+            .group_by(TI.state)
+            .all()
+        )
+        return self.render(
+            'airflow/dag_details.html',
+            dag=dag, title=title, states=states, State=State)
+
+    @current_app.errorhandler(404)
+    def circles(self):
+        return render_template(
+            'airflow/circles.html', hostname=socket.gethostname()), 404
+
+    @current_app.errorhandler(500)
+    def show_traceback(self):
+        from airflow.utils import asciiart as ascii_
+        return render_template(
+            'airflow/traceback.html',
+            hostname=socket.gethostname(),
+            nukular=ascii_.nukular,
+            info=traceback.format_exc()), 500
+
+    @expose('/sandbox')
+    @login_required
+    def sandbox(self):
+        title = "Sandbox Suggested Configuration"
+        cfg_loc = conf.AIRFLOW_CONFIG + '.sandbox'
+        f = open(cfg_loc, 'r')
+        config = f.read()
+        f.close()
+        code_html = Markup(highlight(
+            config,
+            lexers.IniLexer(),  # Lexer call
+            HtmlFormatter(noclasses=True))
+        )
+        return self.render(
+            'airflow/code.html',
+            code_html=code_html, title=title, subtitle=cfg_loc)
+
+    @expose('/noaccess')
+    def noaccess(self):
+        return self.render('airflow/noaccess.html')
+
+    @expose('/headers')
+    def headers(self):
+        d = {
+            'headers': {k: v for k, v in request.headers},
+        }
+        if hasattr(current_user, 'is_superuser'):
+            d['is_superuser'] = current_user.is_superuser()
+            d['data_profiling'] = current_user.data_profiling()
+            d['is_anonymous'] = current_user.is_anonymous()
+            d['is_authenticated'] = current_user.is_authenticated()
+        if hasattr(current_user, 'username'):
+            d['username'] = current_user.username
+        return wwwutils.json_response(d)
+
+    @expose('/pickle_info')
+    def pickle_info(self):
+        d = {}
+        dag_id = request.args.get('dag_id')
+        dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values()
+        for dag in dags:
+            if not dag.is_subdag:
+                d[dag.dag_id] = dag.pickle_info()
+        return wwwutils.json_response(d)
+
+    @expose('/login', methods=['GET', 'POST'])
+    def login(self):
+        return airflow.login.login(self, request)
+
+    @expose('/logout')
+    def logout(self):
+        logout_user()
+        flash('You have been logged out.')
+        return redirect(url_for('admin.index'))
+
+    @expose('/rendered')
+    @login_required
+    @wwwutils.action_logging
+    def rendered(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = dateutil.parser.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        dag = dagbag.get_dag(dag_id)
+        task = copy.copy(dag.get_task(task_id))
+        ti = models.TaskInstance(task=task, execution_date=dttm)
+        try:
+            ti.render_templates()
+        except Exception as e:
+            flash("Error rendering template: " + str(e), "error")
+        title = "Rendered Template"
+        html_dict = {}
+        for template_field in task.__class__.template_fields:
+            content = getattr(task, template_field)
+            if template_field in attr_renderer:
+                html_dict[template_field] = attr_renderer[template_field](content)
+            else:
+                html_dict[template_field] = (
+                    "<pre><code>" + str(content) + "</pre></code>")
+
+        return self.render(
+            'airflow/ti_code.html',
+            html_dict=html_dict,
+            dag=dag,
+            task_id=task_id,
+            execution_date=execution_date,
+            form=form,
+            title=title,)
+
+    @expose('/log')
+    @login_required
+    @wwwutils.action_logging
+    def log(self):
+        BASE_LOG_FOLDER = os.path.expanduser(
+            conf.get('core', 'BASE_LOG_FOLDER'))
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dag = dagbag.get_dag(dag_id)
+        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
+            **locals())
+        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
+        loc = loc.format(**locals())
+        log = ""
+        TI = models.TaskInstance
+        session = Session()
+        dttm = dateutil.parser.parse(execution_date)
+        ti = session.query(TI).filter(
+            TI.dag_id == dag_id, TI.task_id == task_id,
+            TI.execution_date == dttm).first()
+        dttm = dateutil.parser.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+
+        if ti:
+            host = ti.hostname
+            log_loaded = False
+
+            if socket.gethostname() == host:
+                try:
+                    f = open(loc)
+                    log += "".join(f.readlines())
+                    f.close()
+                    log_loaded = True
+                except:
+                    log = "*** Local log file not found.\n".format(loc)
+            else:
+                WORKER_LOG_SERVER_PORT = \
+                    conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+                url = os.path.join(
+                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
+                    ).format(**locals())
+                log += "*** Log file isn't local.\n"
+                log += "*** Fetching here: {url}\n".format(**locals())
+                try:
+                    import requests
+                    log += '\n' + requests.get(url).text
+                    log_loaded = True
+                except:
+                    log += "*** Failed to fetch log file from worker.\n".format(
+                        **locals())
+
+            if not log_loaded:
+                # load remote logs
+                remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+                remote_log = os.path.join(remote_log_base, log_relative)
+                log += '\n*** Reading remote logs...\n'
+
+                # S3
+                if remote_log.startswith('s3:/'):
+                    log += log_utils.S3Log().read(remote_log, return_error=True)
+
+                # GCS
+                elif remote_log.startswith('gs:/'):
+                    log += log_utils.GCSLog().read(remote_log, return_error=True)
+
+                # unsupported
+                elif remote_log:
+                    log += '*** Unsupported remote log location.'
+
+            session.commit()
+            session.close()
+
+        if PY2 and not isinstance(log, unicode):
+            log = log.decode('utf-8')
+
+        title = "Log"
+
+        return self.render(
+            'airflow/ti_code.html',
+            code=log, dag=dag, title=title, task_id=task_id,
+            execution_date=execution_date, form=form)
+
+    @expose('/task')
+    @login_required
+    @wwwutils.action_logging
+    def task(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        # Carrying execution_date through, even though it's irrelevant for
+        # this context
+        execution_date = request.args.get('execution_date')
+        dttm = dateutil.parser.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        dag = dagbag.get_dag(dag_id)
+        if not dag or task_id not in dag.task_ids:
+            flash(
+                "Task [{}.{}] doesn't seem to exist"
+                " at the moment".format(dag_id, task_id),
+                "error")
+            return redirect('/admin/')
+        task = dag.get_task(task_id)
+        task = copy.copy(task)
+        task.resolve_template_files()
+
+        attributes = []
+        for attr_name in dir(task):
+            if not attr_name.startswith('_'):
+                attr = getattr(task, attr_name)
+                if type(attr) != type(self.task) and \
+                                attr_name not in attr_renderer:
+                    attributes.append((attr_name, str(attr)))
+
+        title = "Task Details"
+        # Color coding the special attributes that are code
+        special_attrs_rendered = {}
+        for attr_name in attr_renderer:
+            if hasattr(task, attr_name):
+                source = getattr(task, attr_name)
+                special_attrs_rendered[attr_name] = attr_renderer[attr_name](source)
+
+        return self.render(
+            'airflow/task.html',
+            attributes=attributes,
+            task_id=task_id,
+            execution_date=execution_date,
+            special_attrs_rendered=special_attrs_rendered,
+            form=form,
+            dag=dag, title=title)
+
+    @expose('/xcom')
+    @login_required
+    @wwwutils.action_logging
+    def xcom(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        # Carrying execution_date through, even though it's irrelevant for
+        # this context
+        execution_date = request.args.get('execution_date')
+        dttm = dateutil.parser.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        dag = dagbag.get_dag(dag_id)
+        if not dag or task_id not in dag.task_ids:
+            flash(
+                "Task [{}.{}] doesn't seem to exist"
+                " at the moment".format(dag_id, task_id),
+                "error")
+            return redirect('/admin/')
+
+        session = Session()
+        xcomlist = session.query(XCom).filter(
+            XCom.dag_id == dag_id, XCom.task_id == task_id,
+            XCom.execution_date == dttm).all()
+
+        attributes = []
+        for xcom in xcomlist:
+            if not xcom.key.startswith('_'):
+                attributes.append((xcom.key, xcom.value))
+
+        title = "XCom"
+        return self.render(
+            'airflow/xcom.html',
+            attributes=attributes,
+            task_id=task_id,
+            execution_date=execution_date,
+            form=form,
+            dag=dag, title=title)\
+
+    @expose('/run')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def run(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        dag = dagbag.get_dag(dag_id)
+        task = dag.get_task(task_id)
+
+        execution_date = request.args.get('execution_date')
+        execution_date = dateutil.parser.parse(execution_date)
+        force = request.args.get('force') == "true"
+        deps = request.args.get('deps') == "true"
+
+        try:
+            from airflow.executors import DEFAULT_EXECUTOR as executor
+            from airflow.executors import CeleryExecutor
+            if not isinstance(executor, CeleryExecutor):
+                flash("Only works with the CeleryExecutor, sorry", "error")
+                return redirect(origin)
+        except ImportError:
+            # in case CeleryExecutor cannot be imported it is not active either
+            flash("Only works with the CeleryExecutor, sorry", "error")
+            return redirect(origin)
+
+        ti = models.TaskInstance(task=task, execution_date=execution_date)
+        executor.start()
+        executor.queue_task_instance(
+            ti, force=force, ignore_dependencies=deps)
+        executor.heartbeat()
+        flash(
+            "Sent {} to the message queue, "
+            "it should start any moment now.".format(ti))
+        return redirect(origin)
+
+    @expose('/clear')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def clear(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        dag = dagbag.get_dag(dag_id)
+        task = dag.get_task(task_id)
+
+        execution_date = request.args.get('execution_date')
+        execution_date = dateutil.parser.parse(execution_date)
+        confirmed = request.args.get('confirmed') == "true"
+        upstream = request.args.get('upstream') == "true"
+        downstream = request.args.get('downstream') == "true"
+        future = request.args.get('future') == "true"
+        past = request.args.get('past') == "true"
+
+        dag = dag.sub_dag(
+            task_regex=r"^{0}$".format(task_id),
+            include_downstream=downstream,
+            include_upstream=upstream)
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+        if confirmed:
+            count = dag.clear(
+                start_date=start_date,
+                end_date=end_date)
+
+            flash("{0} task instances have been cleared".format(count))
+            return redirect(origin)
+        else:
+            tis = dag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                dry_run=True)
+            if not tis:
+                flash("No task instances to clear", 'error')
+                response = redirect(origin)
+            else:
+                details = "\n".join([str(t) for t in tis])
+
+                response = self.render(
+                    'airflow/confirm.html',
+                    message=(
+                        "Here's the list of task instances you are about "
+                        "to clear:"),
+                    details=details,)
+
+            return response
+
+    @expose('/blocked')
+    @login_required
+    def blocked(self):
+        session = settings.Session()
+        DR = models.DagRun
+        dags = (
+            session.query(DR.dag_id, sqla.func.count(DR.id))
+            .filter(DR.state == State.RUNNING)
+            .group_by(DR.dag_id)
+            .all()
+        )
+        payload = []
+        for dag_id, active_dag_runs in dags:
+            max_active_runs = 0
+            if dag_id in dagbag.dags:
+                max_active_runs = dagbag.dags[dag_id].max_active_runs
+            payload.append({
+                'dag_id': dag_id,
+                'active_dag_run': active_dag_runs,
+                'max_active_runs': max_active_runs,
+            })
+        return wwwutils.json_response(payload)
+
+    @expose('/success')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def success(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        dag = dagbag.get_dag(dag_id)
+        task = dag.get_task(task_id)
+
+        execution_date = request.args.get('execution_date')
+        execution_date = dateutil.parser.parse(execution_date)
+        confirmed = request.args.get('confirmed') == "true"
+        upstream = request.args.get('upstream') == "true"
+        downstream = request.args.get('downstream') == "true"
+        future = request.args.get('future') == "true"
+        past = request.args.get('past') == "true"
+        MAX_PERIODS = 1000
+
+        # Flagging tasks as successful
+        session = settings.Session()
+        task_ids = [task_id]
+        end_date = ((dag.latest_execution_date or datetime.now())
+                    if future else execution_date)
+
+        if 'start_date' in dag.default_args:
+            start_date = dag.default_args['start_date']
+        elif dag.start_date:
+            start_date = dag.start_date
+        else:
+            start_date = execution_date
+
+        start_date = execution_date if not past else start_date
+
+        if downstream:
+            task_ids += [
+                t.task_id
+                for t in task.get_flat_relatives(upstream=False)]
+        if upstream:
+            task_ids += [
+                t.task_id
+                for t in task.get_flat_relatives(upstream=True)]
+        TI = models.TaskInstance
+
+        if dag.schedule_interval == '@once':
+            dates = [start_date]
+        else:
+            dates = dag.date_range(start_date, end_date=end_date)
+
+        tis = session.query(TI).filter(
+            TI.dag_id == dag_id,
+            TI.execution_date.in_(dates),
+            TI.task_id.in_(task_ids)).all()
+        tis_to_change = session.query(TI).filter(
+            TI.dag_id == dag_id,
+            TI.execution_date.in_(dates),
+            TI.task_id.in_(task_ids),
+            TI.state != State.SUCCESS).all()
+        tasks = list(product(task_ids, dates))
+        tis_to_create = list(
+            set(tasks) -
+            set([(ti.task_id, ti.execution_date) for ti in tis]))
+
+        tis_all_altered = list(chain(
+            [(ti.task_id, ti.execution_date) for ti in tis_to_change],
+            tis_to_create))
+
+        if len(tis_all_altered) > MAX_PERIODS:
+            flash("Too many tasks at once (>{0})".format(
+                MAX_PERIODS), 'error')
+            return redirect(origin)
+
+        if confirmed:
+            for ti in tis_to_change:
+                ti.state = State.SUCCESS
+            session.commit()
+
+            for task_id, task_execution_date in tis_to_create:
+                ti = TI(
+                    task=dag.get_task(task_id),
+                    execution_date=task_execution_date,
+                    state=State.SUCCESS)
+                session.add(ti)
+                session.commit()
+
+            session.commit()
+            session.close()
+            flash("Marked success on {} task instances".format(
+                len(tis_all_altered)))
+
+            return redirect(origin)
+        else:
+            if not tis_all_altered:
+                flash("No task instances to mark as successful", 'error')
+                response = redirect(origin)
+            else:
+                tis = []
+                for task_id, task_execution_date in tis_all_altered:
+                    tis.append(TI(
+                        task=dag.get_task(task_id),
+                        execution_date=task_execution_date,
+                        state=State.SUCCESS))
+                details = "\n".join([str(t) for t in tis])
+
+                response = self.render(
+                    'airflow/confirm.html',
+                    message=(
+                        "Here's the list of task instances you are about "
+                        "to mark as successful:"),
+                    details=details,)
+            return response
+
+    @expose('/tree')
+    @login_required
+    @wwwutils.gzipped
+    @wwwutils.action_logging
+    def tree(self):
+        dag_id = request.args.get('dag_id')
+        blur = conf.getboolean('webserver', 'demo_mode')
+        dag = dagbag.get_dag(dag_id)
+        root = request.args.get('root')
+        if root:
+            dag = dag.sub_dag(
+                task_regex=root,
+                include_downstream=False,
+                include_upstream=True)
+
+        session = settings.Session()
+
+        base_date = request.args.get('base_date')
+        num_runs = request.args.get('num_runs')
+        num_runs = int(num_runs) if num_runs else 25
+
+        if base_date:
+            base_date = dateutil.parser.parse(base_date)
+        else:
+            base_date = dag.latest_execution_date or datetime.now()
+
+        dates = dag.date_range(base_date, num=-abs(num_runs))
+        min_date = dates[0] if dates else datetime(2000, 1, 1)
+
+        DR = models.DagRun
+        dag_runs = (
+            session.query(DR)
+            .filter(
+                DR.dag_id==dag.dag_id,
+                DR.execution_date<=base_date,
+                DR.execution_date>=min_date)
+            .all()
+        )
+        dag_runs = {
+            dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
+
+        tis = dag.get_task_instances(
+                session, start_date=min_date, end_date=base_date)
+        dates = sorted(list({ti.execution_date for ti in tis}))
+        max_date = max([ti.execution_date for ti in tis]) if dates else None
+        task_instances = {}
+        for ti in tis:
+            tid = alchemy_to_dict(ti)
+            dr = dag_runs.get(ti.execution_date)
+            tid['external_trigger'] = dr['external_trigger'] if dr else False
+            task_instances[(ti.task_id, ti.execution_date)] = tid
+
+        expanded = []
+        # The default recursion traces every path so that tree view has full
+        # expand/collapse functionality. After 5,000 nodes we stop and fall
+        # back on a quick DFS search for performance. See PR #320.
+        node_count = [0]
+        node_limit = 5000 / max(1, len(dag.roots))
+
+        def recurse_nodes(task, visited):
+            visited.add(task)
+            node_count[0] += 1
+
+            children = [
+                recurse_nodes(t, visited) for t in task.upstream_list
+                if node_count[0] < node_limit or t not in visited]
+
+            # D3 tree uses children vs _children to define what is
+            # expanded or not. The following block makes it such that
+            # repeated nodes are collapsed by default.
+            children_key = 'children'
+            if task.task_id not in expanded:
+                expanded.append(task.task_id)
+            elif children:
+                children_key = "_children"
+
+            return {
+                'name': task.task_id,
+                'instances': [
+                        task_instances.get((task.task_id, d)) or {
+                            'execution_date': d.isoformat(),
+                            'task_id': task.task_id
+                        }
+                    for d in dates],
+                children_key: children,
+                'num_dep': len(task.upstream_list),
+                'operator': task.task_type,
+                'retries': task.retries,
+                'owner': task.owner,
+                'start_date': task.start_date,
+                'end_date': task.end_date,
+                'depends_on_past': task.depends_on_past,
+                'ui_color': task.ui_color,
+            }
+        data = {
+            'name': '[DAG]',
+            'children': [recurse_nodes(t, set()) for t in dag.roots],
+            'instances': [
+                dag_runs.get(d) or {'execution_date': d.isoformat()}
+                for d in dates],
+        }
+
+        data = json.dumps(data, indent=4, default=json_ser)
+        session.commit()
+        session.close()
+
+        form = DateTimeWithNumRunsForm(data={'base_date': max_date,
+                                             'num_runs': num_runs})
+        return self.render(
+            'airflow/tree.html',
+            operators=sorted(
+                list(set([op.__class__ for op in dag.tasks])),
+                key=lambda x: x.__name__
+            ),
+            root=root,
+            form=form,
+            dag=dag, data=data, blur=blur)
+
+    @expose('/graph')
+    @login_required
+    @wwwutils.gzipped
+    @wwwutils.action_logging
+    def graph(self):
+        session = settings.Session()
+        dag_id = request.args.get('dag_id')
+        blur = conf.getboolean('webserver', 'demo_mode')
+        arrange = request.args.get('arrange', "LR")
+        dag = dagbag.get_dag(dag_id)
+        if dag_id not in dagbag.dags:
+            flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
+            return redirect('/admin/')
+
+        root = request.args.get('root')
+        if root:
+            dag = dag.sub_dag(
+                task_regex=root,
+                include_upstream=True,
+                include_downstream=False)
+
+        nodes = []
+        edges = []
+        for task in dag.tasks:
+            nodes.append({
+                'id': task.task_id,
+                'value': {
+                    'label': task.task_id,
+                    'labelStyle': "fill:{0};".format(task.ui_fgcolor),
+                    'style': "fill:{0};".format(task.ui_color),
+                }
+            })
+
+        def get_upstream(task):
+            for t in task.upstream_list:
+                edge = {
+                    'u': t.task_id,
+                    'v': task.task_id,
+                }
+                if edge not in edges:
+                    edges.append(edge)
+                    get_upstream(t)
+
+        for t in dag.roots:
+            get_upstream(t)
+
+        dttm = request.args.get('execution_date')
+        if dttm:
+            dttm = dateutil.parser.parse(dttm)
+        else:
+            dttm = dag.latest_execution_date or datetime.now().date()
+
+        DR = models.DagRun
+        drs = (
+            session.query(DR)
+            .filter_by(dag_id=dag_id)
+            .order_by(desc(DR.execution_date)).all()
+        )
+        dr_choices = []
+        dr_state = None
+        for dr in drs:
+            dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
+            if dttm == dr.execution_date:
+                dr_state = dr.state
+
+        class GraphForm(Form):
+            execution_date = SelectField("DAG run", choices=dr_choices)
+            arrange = SelectField("Layout", choices=(
+                ('LR', "Left->Right"),
+                ('RL', "Right->Left"),
+                ('TB', "Top->Bottom"),
+                ('BT', "Bottom->Top"),
+            ))
+        form = GraphForm(
+            data={'execution_date': dttm.isoformat(), 'arrange': arrange})
+
+        task_instances = {
+            ti.task_id: alchemy_to_dict(ti)
+            for ti in dag.get_task_instances(session, dttm, dttm)}
+        tasks = {
+            t.task_id: {
+                'dag_id': t.dag_id,
+                'task_type': t.task_type,
+            }
+            for t in dag.tasks}
+        if not tasks:
+            flash("No tasks found", "error")
+        session.commit()
+        session.close()
+        doc_md = markdown.markdown(dag.doc_md) if hasattr(dag, 'doc_md') else ''
+
+        return self.render(
+            'airflow/graph.html',
+            dag=dag,
+            form=form,
+            width=request.args.get('width', "100%"),
+            height=request.args.get('height', "800"),
+            execution_date=dttm.isoformat(),
+            state_token=state_token(dr_state),
+            doc_md=doc_md,
+            arrange=arrange,
+            operators=sorted(
+                list(set([op.__class__ for op in dag.tasks])),
+                key=lambda x: x.__name__
+            ),
+            blur=blur,
+            root=root or '',
+            task_instances=json.dumps(task_instances, indent=2),
+            tasks=json.dumps(tasks, indent=2),
+            nodes=json.dumps(nodes, indent=2),
+            edges=json.dumps(edges, indent=2),)
+
+    @expose('/duration')
+    @login_required
+    @wwwutils.action_logging
+    def duration(self):
+        from nvd3 import lineChart
+        import time
+        session = settings.Session()
+        dag_id = request.args.get('dag_id')
+        dag = dagbag.get_dag(dag_id)
+        base_date = request.args.get('base_date')
+        num_runs = request.args.get('num_runs')
+        num_runs = int(num_runs) if num_runs else 25
+
+        if base_date:
+            base_date = dateutil.parser.parse(base_date)
+        else:
+            base_date = dag.latest_execution_date or datetime.now()
+
+        dates = dag.date_range(base_date, num=-abs(num_runs))
+        min_date = dates[0] if dates else datetime(2000, 1, 1)
+
+        root = request.args.get('root')
+        if root:
+            dag = dag.sub_dag(
+                task_regex=root,
+                include_upstream=True,
+                include_downstream=False)
+
+        chart = lineChart(name="lineChart", x_is_date=True, height=750, width=600)
+        for task in dag.tasks:
+            y = []
+            x = []
+            for ti in task.get_task_instances(session, start_date=min_date,
+                                              end_date=base_date):
+                if ti.duration:
+                    dttm = int(time.mktime(ti.execution_date.timetuple())) * 1000
+                    x.append(dttm)
+                    y.append(float(ti.duration) / (60*60))
+            if x:
+                chart.add_serie(name=task.task_id, x=x, y=y)
+
+        tis = dag.get_task_instances(
+                session, start_date=min_date, end_date=base_date)
+        dates = sorted(list({ti.execution_date for ti in tis}))
+        max_date = max([ti.execution_date for ti in tis]) if dates else None
+
+        session.commit()
+        session.close()
+
+        form = DateTimeWithNumRunsForm(data={'base_date': max_date,
+                                             'num_runs': num_runs})
+        chart.buildhtml()
+        return self.render(
+            'airflow/chart.html',
+            dag=dag,
+            demo_mode=conf.getboolean('webserver', 'demo_mode'),
+            root=root,
+            form=form,
+            chart=chart,
+        )
+
+    @expose('/landing_times')
+    @login_required
+    @wwwutils.action_logging
+    def landing_times(self):
+        session = settings.Session()
+        dag_id = request.args.get('dag_id')
+        dag = dagbag.get_dag(dag_id)
+        base_date = request.args.get('base_date')
+        num_runs = request.args.get('num_runs')
+        num_runs = int(num_runs) if num_runs else 25
+
+        if base_date:
+            base_date = dateutil.parser.parse(base_date)
+        else:
+            base_date = dag.latest_execution_date or datetime.now()
+
+        dates = dag.date_range(base_date, num=-abs(num_runs))
+        min_date = dates[0] if dates else datetime(2000, 1, 1)
+
+        root = request.args.get('root')
+        if root:
+            dag = dag.sub_dag(
+                task_regex=root,
+                include_upstream=True,
+                include_downstream=False)
+
+        all_data = []
+        for task in dag.tasks:
+            data = []
+            for ti in task.get_task_instances(session, start_date=min_date,
+                                              end_date=base_date):
+                if ti.end_date:
+                    ts = ti.execution_date
+                    if dag.schedule_interval:
+                        ts = dag.following_schedule(ts)
+                    secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
+                    data.append([ti.execution_date.isoformat(), secs])
+            all_data.append({'data': data, 'name': task.task_id})
+
+        tis = dag.get_task_instances(
+                session, start_date=min_date, end_date=base_date)
+        dates = sorted(list({ti.execution_date for ti in tis}))
+        max_date = max([ti.execution_date for ti in tis]) if dates else None
+
+        session.commit()
+        session.close()
+
+        form = DateTimeWithNumRunsForm(data={'base_date': max_date,
+                                             'num_runs': num_runs})
+        return self.render(
+            'airflow/chart.html',
+            dag=dag,
+            data=json.dumps(all_data),
+            height="700px",
+            chart_options={'yAxis': {'title': {'text': 'hours after 00:00'}}},
+            demo_mode=conf.getboolean('webserver', 'demo_mode'),
+            root=root,
+            form=form,
+        )
+
+    @expose('/paused')
+    @login_required
+    @wwwutils.action_logging
+    def paused(self):
+        DagModel = models.DagModel
+        dag_id = request.args.get('dag_id')
+        session = settings.Session()
+        orm_dag = session.query(
+            DagModel).filter(DagModel.dag_id == dag_id).first()
+        if request.args.get('is_paused') == 'false':
+            orm_dag.is_paused = True
+        else:
+            orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        dagbag.get_dag(dag_id)
+        return "OK"
+
+    @expose('/refresh')
+    @login_required
+    @wwwutils.action_logging
+    def refresh(self):
+        DagModel = models.DagModel
+        dag_id = request.args.get('dag_id')
+        session = settings.Session()
+        orm_dag = session.query(
+            DagModel).filter(DagModel.dag_id == dag_id).first()
+
+        if orm_dag:
+            orm_dag.last_expired = datetime.now()
+            session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        dagbag.get_dag(dag_id)
+        flash("DAG [{}] is now fresh as a daisy".format(dag_id))
+        return redirect('/')
+
+    @expose('/refresh_all')
+    @login_required
+    @wwwutils.action_logging
+    def refresh_all(self):
+        dagbag.collect_dags(only_if_updated=False)
+        flash("All DAGs are now up to date")
+        return redirect('/')
+
+    @expose('/gantt')
+    @login_required
+    @wwwutils.action_logging
+    def gantt(self):
+
+        session = settings.Session()
+        dag_id = request.args.get('dag_id')
+        dag = dagbag.get_dag(dag_id)
+        demo_mode = conf.getboolean('webserver', 'demo_mode')
+
+        root = request.args.get('root')
+        if root:
+            dag = dag.sub_dag(
+                task_regex=root,
+                include_upstream=True,
+                include_downstream=False)
+
+        dttm = request.args.get('execution_date')
+        if dttm:
+            dttm = dateutil.parser.parse(dttm)
+        else:
+            dttm = dag.latest_execution_date or datetime.now().date()
+
+        form = DateTimeForm(data={'execution_date': dttm})
+
+        tis = [
+            ti
+            for ti in dag.get_task_instances(session, dttm, dttm)
+            if ti.start_date]
+        tis = sorted(tis, key=lambda ti: ti.start_date)
+        tasks = []
+        data = []
+        for i, ti in enumerate(tis):
+            end_date = ti.end_date or datetime.now()
+            tasks += [ti.task_id]
+            color = State.color(ti.state)
+            data.append({
+                'x': i,
+                'low': int(ti.start_date.strftime('%s')) * 1000,
+                'high': int(end_date.strftime('%s')) * 1000,
+                'color': color,
+            })
+        height = (len(tis) * 25) + 50
+        session.commit()
+        session.close()
+
+        hc = {
+            'chart': {
+                'type': 'columnrange',
+                'inverted': True,
+                'height': height,
+            },
+            'xAxis': {'categories': tasks, 'alternateGridColor': '#FAFAFA'},
+            'yAxis': {'type': 'datetime'},
+            'title': {
+                'text': None
+            },
+            'plotOptions': {
+                'series': {
+                    'cursor': 'pointer',
+                    'minPointLength': 4,
+                },
+            },
+            'legend': {
+                'enabled': False
+            },
+            'series': [{
+                'data': data
+            }]
+        }
+        return self.render(
+            'airflow/gantt.html',
+            dag=dag,
+            execution_date=dttm.isoformat(),
+            form=form,
+            hc=json.dumps(hc, indent=4),
+            height=height,
+            demo_mode=demo_mode,
+            root=root,
+        )
+
+    @expose('/object/task_instances')
+    @login_required
+    @wwwutils.action_logging
+    def task_instances(self):
+        session = settings.Session()
+        dag_id = request.args.get('dag_id')
+        dag = dagbag.get_dag(dag_id)
+
+        dttm = request.args.get('execution_date')
+        if dttm:
+            dttm = dateutil.parser.parse(dttm)
+        else:
+            return ("Error: Invalid execution_date")
+
+        task_instances = {
+            ti.task_id: alchemy_to_dict(ti)
+            for ti in dag.get_task_instances(session, dttm, dttm)}
+
+        return json.dumps(task_instances)
+
+    @expose('/variables/<form>', methods=["GET", "POST"])
+    @login_required
+    @wwwutils.action_logging
+    def variables(self, form):
+        try:
+            if request.method == 'POST':
+                data = request.json
+                if data:
+                    session = settings.Session()
+                    var = models.Variable(key=form, val=json.dumps(data))
+                    session.add(var)
+                    session.commit()
+                return ""
+            else:
+                return self.render(
+                    'airflow/variables/{}.html'.format(form)
+                )
+        except:
+            return ("Error: form airflow/variables/{}.html "
+                    "not found.").format(form), 404
+
+
+class HomeView(AdminIndexView):
+    @expose("/")
+    @login_required
+    def index(self):
+        session = Session()
+        DM = models.DagModel
+        qry = None
+        # filter the dags if filter_by_owner and current user is not superuser
+        do_filter = FILTER_BY_OWNER and (not current_user.is_superuser())
+        if do_filter:
+            qry = (
+                session.query(DM)
+                    .filter(
+                    ~DM.is_subdag, DM.is_active,
+                    DM.owners == current_user.username)
+                    .all()
+            )
+        else:
+            qry = session.query(DM).filter(~DM.is_subdag, DM.is_active).all()
+        orm_dags = {dag.dag_id: dag for dag in qry}
+        import_errors = session.query(models.ImportError).all()
+        for ie in import_errors:
+            flash(
+                "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
+                "error")
+        session.expunge_all()
+        session.commit()
+        session.close()
+        dags = dagbag.dags.values()
+        if do_filter:
+            dags = {
+                dag.dag_id: dag
+                for dag in dags
+                if (
+                    dag.owner == current_user.username and (not dag.parent_dag)
+                )
+                }
+        else:
+            dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag}
+        all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys()))
+        return self.render(
+            'airflow/dags.html',
+            dags=dags,
+            orm_dags=orm_dags,
+            all_dag_ids=all_dag_ids)
+
+
+class QueryView(wwwutils.DataProfilingMixin, BaseView):
+    @expose('/')
+    @wwwutils.gzipped
+    def query(self):
+        session = settings.Session()
+        dbs = session.query(models.Connection).order_by(
+            models.Connection.conn_id).all()
+        session.expunge_all()
+        db_choices = list(
+            ((db.conn_id, db.conn_id) for db in dbs if db.get_hook()))
+        conn_id_str = request.args.get('conn_id')
+        csv = request.args.get('csv') == "true"
+        sql = request.args.get('sql')
+
+        class QueryForm(Form):
+            conn_id = SelectField("Layout", choices=db_choices)
+            sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget())
+        data = {
+            'conn_id': conn_id_str,
+            'sql': sql,
+        }
+        results = None
+        has_data = False
+        error = False
+        if conn_id_str:
+            db = [db for db in dbs if db.conn_id == conn_id_str][0]
+            hook = db.get_hook()
+            try:
+                df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type))
+                # df = hook.get_pandas_df(sql)
+                has_data = len(df) > 0
+                df = df.fillna('')
+                results = df.to_html(
+                    classes=[
+                        'table', 'table-bordered', 'table-striped', 'no-wrap'],
+                    index=False,
+                    na_rep='',
+                ) if has_data else ''
+            except Exception as e:
+                flash(str(e), 'error')
+                error = True
+
+        if has_data and len(df) == QUERY_LIMIT:
+            flash(
+                "Query output truncated at " + str(QUERY_LIMIT) +
+                " rows", 'info')
+
+        if not has_data and error:
+            flash('No data', 'error')
+
+        if csv:
+            return Response(
+                response=df.to_csv(index=False),
+                status=200,
+                mimetype="application/text")
+
+        form = QueryForm(request.form, data=data)
+        session.commit()
+        session.close()
+        return self.render(
+            'airflow/query.html', form=form,
+            title="Ad Hoc Query",
+            results=results or '',
+            has_data=has_data)
+
+
+class AirflowModelView(ModelView):
+    list_template = 'airflow/model_list.html'
+    edit_template = 'airflow/model_edit.html'
+    create_template = 'airflow/model_create.html'
+    column_display_actions = True
+    page_size = 500
+
+
+class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView):
+    """
+    Modifying the base ModelView class for non edit, browse only operations
+    """
+    named_filter_urls = True
+    can_create = False
+    can_edit = False
+    can_delete = False
+    column_display_pk = True
+
+
+class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView):
+    column_list = ('pool', 'slots', 'used_slots', 'queued_slots')
+    column_formatters = dict(
+        pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots)
+    named_filter_urls = True
+
+
+class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
+    verbose_name_plural = "SLA misses"
+    verbose_name = "SLA miss"
+    column_list = (
+        'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp')
+    column_formatters = dict(
+        task_id=task_instance_link,
+        execution_date=datetime_f,
+        timestamp=datetime_f,
+        dag_id=dag_link)
+    named_filter_urls = True
+    column_searchable_list = ('dag_id', 'task_id',)
+    column_filters = (
+        'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
+    form_widget_args = {
+        'email_sent': {'disabled': True},
+        'timestamp': {'disabled': True},
+    }
+
+class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
+    verbose_name = "chart"
+    verbose_name_plural = "charts"
+    form_columns = (
+        'label',
+        'owner',
+        'conn_id',
+        'chart_type',
+        'show_datatable',
+        'x_is_date',
+        'y_log_scale',
+        'show_sql',
+        'height',
+        'sql_layout',
+        'sql',
+        'default_params',)
+    column_list = (
+        'label', 'conn_id', 'chart_type', 'owner', 'last_modified',)
+    column_formatters = dict(label=label_link, last_modified=datetime_f)
+    column_default_sort = ('last_modified', True)
+    create_template = 'airflow/chart/create.html'
+    edit_template = 'airflow/chart/edit.html'
+    column_filters = ('label', 'owner.username', 'conn_id')
+    column_searchable_list = ('owner.username', 'label', 'sql')
+    column_descriptions = {
+        'label': "Can include {{ templated_fields }} and {{ macros }}",
+        'chart_type': "The type of chart to be displayed",
+        'sql': "Can include {{ templated_fields }} and {{ macros }}.",
+        'height': "Height of the chart, in pixels.",
+        'conn_id': "Source database to run the query against",
+        'x_is_date': (
+            "Whether the X axis should be casted as a date field. Expect most "
+            "intelligible date formats to get casted properly."
+        ),
+        'owner': (
+            "The chart's owner, mostly used for reference and filtering in "
+            "the list view."
+        ),
+        'show_datatable':
+            "Whether to display an interactive data table under the chart.",
+        'default_params': (
+            'A dictionary of {"key": "values",} that define what the '
+            'templated fields (parameters) values should be by default. '
+            'To be valid, it needs to "eval" as a Python dict. '
+            'The key values will show up in the url\'s querystring '
+            'and can be altered there.'
+        ),
+        'show_sql': "Whether to display the SQL statement as a collapsible "
+                    "section in the chart page.",
+        'y_log_scale': "Whether to use a log scale for the Y axis.",
+        'sql_layout': (
+            "Defines the layout of the SQL that the application should "
+            "expect. Depending on the tables you are sourcing from, it may "
+            "make more sense to pivot / unpivot the metrics."
+        ),
+    }
+    column_labels = {
+        'sql': "SQL",
+        'height': "Chart Height",
+        'sql_layout': "SQL Layout",
+        'show_sql': "Display the SQL Statement",
+        'default_params': "Default Parameters",
+    }
+    form_choices = {
+        'chart_type': [
+            ('line', 'Line Chart'),
+            ('spline', 'Spline Chart'),
+            ('bar', 'Bar Chart'),
+            ('para', 'Parallel Coordinates'),
+            ('column', 'Column Chart'),
+            ('area', 'Overlapping Area Chart'),
+            ('stacked_area', 'Stacked Area Chart'),
+            ('percent_area', 'Percent Area Chart'),
+            ('heatmap', 'Heatmap'),
+            ('datatable', 'No chart, data table only'),
+        ],
+        'sql_layout': [
+            ('series', 'SELECT series, x, y FROM ...'),
+            ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
+        ],
+        'conn_id': [
+            (c.conn_id, c.conn_id)
+            for c in (
+                Session().query(models.Connection.conn_id)
+                    .group_by(models.Connection.conn_id)
+            )
+            ]
+    }
+
+    def on_model_change(self, form, model, is_created=True):
+        if model.iteration_no is None:
+            model.iteration_no = 0
+        else:
+            model.iteration_no += 1
+        if not model.user_id and current_user and hasattr(current_user, 'id'):
+            model.user_id = current_user.id
+        model.last_modified = datetime.now()
+
+
+class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView):
+    verbose_name = "known event"
+    verbose_name_plural = "known events"
+    form_columns = (
+        'label',
+        'event_type',
+        'start_date',
+        'end_date',
+        'reported_by',
+        'description')
+    column_list = (
+        'label', 'event_type', 'start_date', 'end_date', 'reported_by')
+    column_default_sort = ("start_date", True)
+
+
+class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
+    pass
+
+'''
+# For debugging / troubleshooting
+mv = KnowEventTypeView(
+    models.KnownEventType,
+    Session, name="Known Event Types", category="Manage")
+admin.add_view(mv)
+class DagPickleView(SuperUserMixin, ModelView):
+    pass
+mv = DagPickleView(
+    models.DagPickle,
+    Session, name="Pickles", category="Manage")
+admin.add_view(mv)
+'''
+
+
+class VariableView(wwwutils.LoginMixin, AirflowModelView):
+    verbose_name = "Variable"
+    verbose_name_plural = "Variables"
+    form_columns = (
+        'key',
+        'val',
+    )
+    column_list = ('key', 'is_encrypted',)
+    column_filters = ('key', 'val')
+    column_searchable_list = ('key', 'val')
+    form_widget_args = {
+        'is_encrypted': {'disabled': True},
+        'val': {
+            'rows': 20,
+        }
+    }
+
+
+class JobModelView(ModelViewOnly):
+    verbose_name_plural = "jobs"
+    verbose_name = "job"
+    column_default_sort = ('start_date', True)
+    column_filters = (
+        'job_type', 'dag_id', 'state',
+        'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat')
+    column_formatters = dict(
+        start_date=datetime_f,
+        end_date=datetime_f,
+        hostname=nobr_f,
+        state=state_f,
+        latest_heartbeat=datetime_f)
+
+
+class DagRunModelView(ModelViewOnly):
+    verbose_name_plural = "DAG Runs"
+    can_delete = True
+    can_edit = True
+    can_create = True
+    column_editable_list = ('state',)
+    verbose_name = "dag run"
+    column_default_sort = ('execution_date', True)
+    form_choices = {
+        'state': [
+            ('success', 'success'),
+            ('running', 'running'),
+            ('failed', 'failed'),
+        ],
+    }
+    column_list = (
+        'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
+    column_filters = column_list
+    column_searchable_list = ('dag_id', 'state', 'run_id')
+    column_formatters = dict(
+        execution_date=datetime_f,
+        state=state_f,
+        start_date=datetime_f,
+        dag_id=dag_link)
+
+    @action('set_running', "Set state to 'running'", None)
+    def action_set_running(self, ids):
+        self.set_dagrun_state(ids, State.RUNNING)
+
+    @action('set_failed', "Set state to 'failed'", None)
+    def action_set_failed(self, ids):
+        self.set_dagrun_state(ids, State.FAILED)
+
+    @action('set_success', "Set state to 'success'", None)
+    def action_set_success(self, ids):
+        self.set_dagrun_state(ids, State.SUCCESS)
+
+    @provide_session
+    def set_dagrun_state(self, ids, target_state, session=None):
+        try:
+            DR = models.DagRun
+            count = 0
+            for dr in session.query(DR).filter(DR.id.in_(ids)).all():
+                count += 1
+                dr.state = target_state
+                if target_state == State.RUNNING:
+                    dr.start_date = datetime.now()
+                else:
+                    dr.end_date = datetime.now()
+            session.commit()
+            flash(
+                "{count} dag runs were set to '{target_state}'".format(**locals()))
+        except Exception as ex:
+            if not self.handle_view_exception(ex):
+                raise Exception("Ooops")
+            flash('Failed to set state', 'error')
+
+
+class LogModelView(ModelViewOnly):
+    verbose_name_plural = "logs"
+    verbose_name = "log"
+    column_default_sort = ('dttm', True)
+    column_filters = ('dag_id', 'task_id', 'execution_date')
+    column_formatters = dict(
+        dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
+
+
+class TaskInstanceModelView(ModelViewOnly):
+    verbose_name_plural = "task instances"
+    verbose_name = "task instance"
+    column_filters = (
+        'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
+        'queue', 'pool', 'operator', 'start_date', 'end_date')
+    named_filter_urls = True
+    column_formatters = dict(
+        log=log_link, task_id=task_instance_link,
+        hostname=nobr_f,
+        state=state_f,
+        execution_date=datetime_f,
+        start_date=datetime_f,
+        end_date=datetime_f,
+        queued_dttm=datetime_f,
+        dag_id=dag_link, duration=duration_f)
+    column_searchable_list = ('dag_id', 'task_id', 'state')
+    column_default_sort = ('start_date', True)
+    form_choices = {
+        'state': [
+            ('success', 'success'),
+            ('running', 'running'),
+            ('failed', 'failed'),
+        ],
+    }
+    column_list = (
+        'state', 'dag_id', 'task_id', 'execution_date', 'operator',
+        'start_date', 'end_date', 'duration', 'job_id', 'hostname',
+        'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
+        'pool', 'log')
+    can_delete = True
+    page_size = 500
+
+    @action('set_running', "Set state to 'running'", None)
+    def action_set_running(self, ids):
+        self.set_task_instance_state(ids, State.RUNNING)
+
+    @action('set_failed', "Set state to 'failed'", None)
+    def action_set_failed(self, ids):
+        self.set_task_instance_state(ids, State.FAILED)
+
+    @action('set_success', "Set state to 'success'", None)
+    def action_set_success(self, ids):
+        self.set_task_instance_state(ids, State.SUCCESS)
+
+    @action('set_retry', "Set state to 'up_for_retry'", None)
+    def action_set_retry(self, ids):
+        self.set_task_instance_state(ids, State.UP_FOR_RETRY)
+
+    @provide_session
+    def set_task_instance_state(self, ids, target_state, session=None):
+        try:
+            TI = models.TaskInstance
+            for count, id in enumerate(ids):
+                task_id, dag_id, execution_date = id.split(',')
+                execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
+                ti = session.query(TI).filter(TI.task_id == task_id,
+                                              TI.dag_id == dag_id,
+                                              TI.execution_date == execution_date).one()
+                ti.state = target_state
+            count += 1
+            session.commit()
+            flash(
+                "{count} task instances were set to '{target_state}'".format(**locals()))
+        except Exception as ex:
+            if not self.handle_view_exception(ex):
+                raise Exception("Ooops")
+            flash('Failed to set state', 'error')
+
+
+class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
+    create_template = 'airflow/conn_create.html'
+    edit_template = 'airflow/conn_edit.html'
+    list_template = 'airflow/conn_list.html'
+    form_columns = (
+        'conn_id',
+        'conn_type',
+        'host',
+        'schema',
+        'login',
+        'password',
+        'port',
+        'extra',
+        'extra__jdbc__drv_path',
+        'extra__jdbc__drv_clsname',
+        'extra__google_cloud_platform__project',
+        'extra__google_cloud_platform__key_path',
+        'extra__google_cloud_platform__service_account',
+        'extra__google_cloud_platform__scope',
+    )
+    verbose_name = "Connection"
+    verbose_name_plural = "Connections"
+    column_default_sort = ('conn_id', False)
+    column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',)
+    form_overrides = dict(_password=PasswordField)
+    form_widget_args = {
+        'is_extra_encrypted': {'disabled': True},
+        'is_encrypted': {'disabled': True},
+    }
+    # Used to customized the form, the forms elements get rendered
+    # and results are stored in the extra field as json. All of these
+    # need to be prefixed with extra__ and then the conn_type ___ as in
+    # extra__{conn_type}__name. You can also hide form elements and rename
+    # others from the connection_form.js file
+    form_extra_fields = {
+        'extra__jdbc__drv_path' : StringField('Driver Path'),
+        'extra__jdbc__drv_clsname': StringField('Driver Class'),
+        'extra__google_cloud_platform__project': StringField('Project'),
+        'extra__google_cloud_platform__key_path': StringField('Keyfile Path'),
+        'extra__google_cloud_platform__service_account': StringField('Service Account'),
+        'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'),
+
+    }
+    form_choices = {
+        'conn_type': [
+            ('bigquery', 'BigQuery',),
+            ('datastore', 'Google Datastore'),
+            ('ftp', 'FTP',),
+            ('google_cloud_storage', 'Google Cloud Storage'),
+            ('google_cloud_platform', 'Google Cloud Platform'),
+            ('hdfs', 'HDFS',),
+            ('http', 'HTTP',),
+            ('hive_cli', 'Hive Client Wrapper',),
+            ('hive_metastore', 'Hive Metastore Thrift',),
+            ('hiveserver2', 'Hive Server 2 Thrift',),
+            ('jdbc', 'Jdbc Connection',),
+            ('mysql', 'MySQL',),
+            ('postgres', 'Postgres',),
+            ('oracle', 'Oracle',),
+            ('vertica', 'Vertica',),
+            ('presto', 'Presto',),
+            ('s3', 'S3',),
+            ('samba', 'Samba',),
+            ('sqlite', 'Sqlite',),
+            ('ssh', 'SSH',),
+            ('cloudant', 'IBM Cloudant',),
+            ('mssql', 'Microsoft SQL Server'),
+            ('mesos_framework-id', 'Mesos Framework ID'),
+        ]
+    }
+
+    def on_model_change(self, form, model, is_created):
+        formdata = form.data
+        if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']:
+            extra = {
+                key:formdata[key]
+                for key in self.form_extra_fields.keys() if key in formdata}
+            model.extra = json.dumps(extra)
+
+    @classmethod
+    def alert_fernet_key(cls):
+        return conf.get('core', 'fernet_key') is None
+
+    @classmethod
+    def is_secure(self):
+        """
+        Used to display a message in the Connection list view making it clear
+        that the passwords and `extra` field can't be encrypted.
+        """
+        is_secure = False
+        try:
+            import cryptography
+            conf.get('core', 'fernet_key')
+            is_secure = True
+        except:
+            pass
+        return is_secure
+
+    def on_form_prefill(self, form, id):
+        try:
+            d = json.loads(form.data.get('extra', '{}'))
+        except Exception as e:
+            d = {}
+
+        for field in list(self.form_extra_fields.keys()):
+            value = d.get(field, '')
+            if value:
+                field = getattr(form, field)
+                field.data = value
+
+
+class UserModelView(wwwutils.SuperUserMixin, AirflowModelView):
+    verbose_name = "User"
+    verbose_name_plural = "Users"
+    column_default_sort = 'username'
+
+
+class ConfigurationView(wwwutils.SuperUserMixin, BaseView):
+    @expose('/')
+    def conf(self):
+        raw = request.args.get('raw') == "true"
+        title = "Airflow Configuration"
+        subtitle = conf.AIRFLOW_CONFIG
+        if conf.getboolean("webserver", "expose_config"):
+            with open(conf.AIRFLOW_CONFIG, 'r') as f:
+                config = f.read()
+        else:
+            config = (
+                "# You Airflow administrator chose not to expose the "
+                "configuration, most likely for security reasons.")
+        if raw:
+            return Response(
+                response=config,
+                status=200,
+                mimetype="application/text")
+        else:
+            code_html = Markup(highlight(
+                config,
+                lexers.IniLexer(),  # Lexer call
+                HtmlFormatter(noclasses=True))
+            )
+            return self.render(
+                'airflow/code.html',
+                pre_subtitle=settings.HEADER + "  v" + airflow.__version__,
+                code_html=code_html, title=title, subtitle=subtitle)
+
+
+class DagModelView(wwwutils.SuperUserMixin, ModelView):
+    column_list = ('dag_id', 'owners')
+    column_editable_list = ('is_paused',)
+    form_excluded_columns = ('is_subdag', 'is_active')
+    column_searchable_list = ('dag_id',)
+    column_filters = (
+        'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
+        'last_scheduler_run', 'last_expired')
+    form_widget_args = {
+        'last_scheduler_run': {'disabled': True},
+        'fileloc': {'disabled': True},
+        'is_paused': {'disabled': True},
+        'last_pickled': {'disabled': True},
+        'pickle_id': {'disabled': True},
+        'last_loaded': {'disabled': True},
+        'last_expired': {'disabled': True},
+        'pickle_size': {'disabled': True},
+        'scheduler_lock': {'disabled': True},
+        'owners': {'disabled': True},
+    }
+    column_formatters = dict(
+        dag_id=dag_link,
+    )
+    can_delete = False
+    can_create = False
+    page_size = 50
+    list_template = 'airflow/list_dags.html'
+    named_filter_urls = True
+
+    def get_query(self):
+        """
+        Default filters for model
+        """
+        return (
+            super(DagModelView, self)
+                .get_query()
+                .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
+                .filter(~models.DagModel.is_subdag)
+        )
+
+    def get_count_query(self):
+        """
+        Default filters for model
+        """
+        return (
+            super(DagModelView, self)
+                .get_count_query()
+                .filter(models.DagModel.is_active)
+                .filter(~models.DagModel.is_subdag)
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/bin/airflow
----------------------------------------------------------------------
diff --git a/airflow/bin/airflow b/airflow/bin/airflow
index 80f1135..0598596 100755
--- a/airflow/bin/airflow
+++ b/airflow/bin/airflow
@@ -1,4 +1,17 @@
 #!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 import logging
 import os
 from airflow import configuration

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 2e88fa9..ca63443 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -1,3 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from builtins import range
 
 from airflow import configuration

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index de56baf..04414fb 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -1,3 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from builtins import object
 import logging
 import subprocess



Mime
View raw message