Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8B7C7200C5C for ; Mon, 13 Mar 2017 05:51:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8A555160B96; Mon, 13 Mar 2017 04:51:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A86D2160B87 for ; Mon, 13 Mar 2017 05:51:45 +0100 (CET) Received: (qmail 4897 invoked by uid 500); 13 Mar 2017 04:51:44 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 4843 invoked by uid 99); 13 Mar 2017 04:51:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 04:51:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EF44AC00B6 for ; Mon, 13 Mar 2017 04:51:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.569 X-Spam-Level: X-Spam-Status: No, score=-3.569 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id aF6OihyniRfm for ; Mon, 13 Mar 2017 04:51:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 293BE5FC7B for ; Mon, 13 Mar 2017 04:51:40 +0000 (UTC) Received: (qmail 94186 invoked by uid 99); 13 Mar 2017 04:44:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 04:44:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DA3C0DFF66; Mon, 13 Mar 2017 04:44:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bolke@apache.org To: commits@airflow.incubator.apache.org Date: Mon, 13 Mar 2017 04:45:35 -0000 Message-Id: <67e15f8bb6ef4371a0f52b522f799fc2@git.apache.org> In-Reply-To: <47f65032ee134f7aa3ff2a15122acda0@git.apache.org> References: <47f65032ee134f7aa3ff2a15122acda0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [37/45] incubator-airflow git commit: AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[ archived-at: Mon, 13 Mar 2017 04:51:46 -0000 AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[ In a backfill one can specify a specific task to execute. We create a subset of the orginal tasks in a subdag from the original dag. The subdag has the same name as the original dag. This breaks the integrity check of a dag_run as tasks are suddenly not in scope any more. Closes #2122 from bolkedebruin/AIRFLOW-921 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a8f2c27e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a8f2c27e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a8f2c27e Branch: refs/heads/v1-8-stable Commit: a8f2c27ed44449e6611c7c4a9ec8cf2371cf0987 Parents: dacc69a Author: Bolke de Bruin Authored: Sat Mar 11 10:52:07 2017 -0800 Committer: Bolke de Bruin Committed: Sun Mar 12 08:34:22 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 1 + airflow/models.py | 12 +++++++++++- tests/jobs.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 36548c2..c61b229 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1803,6 +1803,7 @@ class BackfillJob(BaseJob): # explictely mark running as we can fill gaps run.state = State.RUNNING + run.run_id = run_id run.verify_integrity(session=session) # check if we have orphaned tasks http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index e63da3e..32c52ac 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2681,6 +2681,8 @@ class DAG(BaseDag, LoggingMixin): self.orientation = orientation self.catchup = catchup + self.partial = False + self._comps = { 'dag_id', 'task_ids', @@ -3186,6 +3188,10 @@ class DAG(BaseDag, LoggingMixin): tid for tid in t._upstream_task_ids if tid in dag.task_ids] t._downstream_task_ids = [ tid for tid in t._downstream_task_ids if tid in dag.task_ids] + + if len(dag.tasks) < len(self.tasks): + dag.partial = True + return dag def has_task(self, task_id): @@ -3946,6 +3952,9 @@ class DagRun(Base): else: tis = tis.filter(TI.state.in_(state)) + if self.dag and self.dag.partial: + tis = tis.filter(TI.task_id.in_(self.dag.task_ids)) + return tis.all() @provide_session @@ -4006,6 +4015,7 @@ class DagRun(Base): """ dag = self.get_dag() + tis = self.get_task_instances(session=session) logging.info("Updating state for {} considering {} task(s)" @@ -4090,7 +4100,7 @@ class DagRun(Base): try: dag.get_task(ti.task_id) except AirflowException: - if self.state is not State.RUNNING: + if self.state is not State.RUNNING and not dag.partial: ti.state = State.REMOVED # check for missing tasks http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 1acf269..d208fd4 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -42,6 +42,8 @@ from tests.executor.test_executor import TestExecutor from airflow import configuration configuration.load_test_config() +import sqlalchemy + try: from unittest import mock except ImportError: @@ -294,6 +296,53 @@ class BackfillJobTest(unittest.TestCase): self.assertEqual(ti.state, State.SUCCESS) dag.clear() + def test_sub_set_subdag(self): + dag = DAG( + 'test_sub_set_subdag', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + with dag: + op1 = DummyOperator(task_id='leave1') + op2 = DummyOperator(task_id='leave2') + op3 = DummyOperator(task_id='upstream_level_1') + op4 = DummyOperator(task_id='upstream_level_2') + op5 = DummyOperator(task_id='upstream_level_3') + # order randomly + op2.set_downstream(op3) + op1.set_downstream(op3) + op4.set_downstream(op5) + op3.set_downstream(op4) + + dag.clear() + dr = dag.create_dagrun(run_id="test", + state=State.SUCCESS, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE) + + executor = TestExecutor(do_update=True) + sub_dag = dag.sub_dag(task_regex="leave*", + include_downstream=False, + include_upstream=False) + job = BackfillJob(dag=sub_dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + executor=executor) + job.run() + + self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db) + # the run_id should have changed, so a refresh won't work + drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE) + dr = drs[0] + + self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()), + dr.run_id) + for ti in dr.get_task_instances(): + if ti.task_id == 'leave1' or ti.task_id == 'leave2': + self.assertEqual(State.SUCCESS, ti.state) + else: + self.assertEqual(State.NONE, ti.state) + class SchedulerJobTest(unittest.TestCase): # These defaults make the test faster to run