airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-863] Example DAGs should have recent start dates
Date Sat, 18 Feb 2017 14:26:45 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 563cc9a3c -> 3658bf310


[AIRFLOW-863] Example DAGs should have recent start dates

Avoid unnecessary backfills by having start dates
of
just a few days ago. Adds a utility function
airflow.utils.dates.days_ago().

Closes #2068 from jlowin/example-start-date

(cherry picked from commit bbfd43df4663547abda4ac6fdc3a6ed730a75b57)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3658bf31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3658bf31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3658bf31

Branch: refs/heads/v1-8-test
Commit: 3658bf310811cd22651b6c20c5d50bfbd3153025
Parents: 563cc9a
Author: Jeremiah Lowin <jlowin@apache.org>
Authored: Sun Feb 12 15:37:56 2017 -0500
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Sat Feb 18 15:26:34 2017 +0100

----------------------------------------------------------------------
 .../example_emr_job_flow_automatic_steps.py     |  6 +--
 .../example_emr_job_flow_manual_steps.py        |  5 ++-
 .../example_dags/example_qubole_operator.py     |  6 +--
 .../contrib/example_dags/example_twitter_dag.py |  5 ++-
 airflow/example_dags/example_bash_operator.py   |  9 ++--
 airflow/example_dags/example_branch_operator.py |  8 ++--
 .../example_branch_python_dop_operator_3.py     |  5 +--
 airflow/example_dags/example_http_operator.py   |  7 ++-
 airflow/example_dags/example_latest_only.py     |  4 +-
 .../example_latest_only_with_trigger.py         |  4 +-
 .../example_passing_params_via_test_command.py  |  6 +--
 airflow/example_dags/example_python_operator.py |  7 +--
 .../example_short_circuit_operator.py           |  7 ++-
 airflow/example_dags/example_skip_dag.py        |  9 ++--
 airflow/example_dags/example_subdag_operator.py |  4 +-
 airflow/example_dags/example_xcom.py            |  9 ++--
 airflow/example_dags/test_utils.py              |  3 +-
 airflow/example_dags/tutorial.py                |  7 ++-
 airflow/utils/dates.py                          | 13 ++++++
 dags/test_dag.py                                |  2 +-
 scripts/perf/dags/perf_dag_1.py                 |  7 ++-
 scripts/perf/dags/perf_dag_2.py                 |  8 ++--
 tests/utils/__init__.py                         | 16 +++++++
 tests/utils/dates.py                            | 45 ++++++++++++++++++++
 24 files changed, 132 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
index 18399c7..7f57ad1 100644
--- a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
+++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
@@ -12,8 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import timedelta, datetime
-
+from datetime import timedelta
+import airflow
 from airflow import DAG
 from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
 from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
@@ -21,7 +21,7 @@ from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
 DEFAULT_ARGS = {
     'owner': 'airflow',
     'depends_on_past': False,
-    'start_date': datetime(2016, 3, 13),
+    'start_date': airflow.utils.dates.days_ago(2),
     'email': ['airflow@airflow.com'],
     'email_on_failure': False,
     'email_on_retry': False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
index b498d50..caa6943 100644
--- a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
+++ b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
@@ -12,8 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import timedelta, datetime
+from datetime import timedelta
 
+import airflow
 from airflow import DAG
 from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
 from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
@@ -23,7 +24,7 @@ from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTermina
 DEFAULT_ARGS = {
     'owner': 'airflow',
     'depends_on_past': False,
-    'start_date': datetime(2016, 3, 13),
+    'start_date': airflow.utils.dates.days_ago(2),
     'email': ['airflow@airflow.com'],
     'email_on_failure': False,
     'email_on_retry': False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index b482cf4..fce0175 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -16,17 +16,15 @@ from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
 from airflow.contrib.operators.qubole_operator import QuboleOperator
-from datetime import datetime, timedelta
 import filecmp
 import random
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
-                                  datetime.min.time())
+
 
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2)
     'email': ['airflow@airflow.com'],
     'email_on_failure': False,
     'email_on_retry': False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py
index d63b4e8..a25c8d0 100644
--- a/airflow/contrib/example_dags/example_twitter_dag.py
+++ b/airflow/contrib/example_dags/example_twitter_dag.py
@@ -22,11 +22,12 @@
 # Load The Dependencies
 # --------------------------------------------------------------------------------
 
+import airflow
 from airflow import DAG
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.hive_operator import HiveOperator
-from datetime import datetime, date, timedelta
+from datetime import date, timedelta
 
 # --------------------------------------------------------------------------------
 # Create a few placeholder scripts. In practice these would be different python
@@ -57,7 +58,7 @@ def transfertodb():
 default_args = {
     'owner': 'Ekhtiar',
     'depends_on_past': False,
-    'start_date': datetime(2016, 3, 13),
+    'start_date': airflow.utils.dates.days_ago(5),
     'email': ['airflow@airflow.com'],
     'email_on_failure': False,
     'email_on_retry': False,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index 0d18bcf..6887fa9 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -11,17 +11,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import airflow
 from builtins import range
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
-from datetime import datetime, timedelta
+from datetime import timedelta
+
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
-                                  datetime.min.time())
 args = {
     'owner': 'airflow',
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2)
 }
 
 dag = DAG(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index cc559d0..2b11d91 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -11,17 +11,17 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import airflow
 from airflow.operators.python_operator import BranchPythonOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
-from datetime import datetime, timedelta
 import random
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
-                                  datetime.min.time())
+
 args = {
     'owner': 'airflow',
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2)
 }
 
 dag = DAG(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_branch_python_dop_operator_3.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index 1dd190e..6da7b68 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -13,16 +13,15 @@
 # limitations under the License.
 #
 
+import airflow
 from airflow.operators.python_operator import BranchPythonOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 
-two_days_ago = datetime.combine(datetime.today() - timedelta(2),
-                                  datetime.min.time())
 args = {
     'owner': 'airflow',
-    'start_date': two_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2),
     'depends_on_past': True,
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 18a67f5..0cc23b9 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -14,19 +14,18 @@
 """
 ### Example HTTP operator and sensor
 """
+import airflow
 from airflow import DAG
 from airflow.operators.http_operator import SimpleHttpOperator
 from airflow.operators.sensors import HttpSensor
-from datetime import datetime, timedelta
+from datetime import timedelta
 import json
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
-                                  datetime.min.time())
 
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2),
     'email': ['airflow@airflow.com'],
     'email_on_failure': False,
     'email_on_retry': False,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_latest_only.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 9ce03b9..38ee900 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -16,16 +16,16 @@ Example of the LatestOnlyOperator
 """
 import datetime as dt
 
+import airflow
 from airflow.models import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.latest_only_operator import LatestOnlyOperator
 from airflow.utils.trigger_rule import TriggerRule
 
-
 dag = DAG(
     dag_id='latest_only',
     schedule_interval=dt.timedelta(hours=4),
-    start_date=dt.datetime(2016, 9, 20),
+    start_date=airflow.utils.dates.days_ago(2),
 )
 
 latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_latest_only_with_trigger.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index e3a88b7..f2afdcf 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -16,16 +16,16 @@ Example LatestOnlyOperator and TriggerRule interactions
 """
 import datetime as dt
 
+import airflow
 from airflow.models import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.latest_only_operator import LatestOnlyOperator
 from airflow.utils.trigger_rule import TriggerRule
 
-
 dag = DAG(
     dag_id='latest_only_with_trigger',
     schedule_interval=dt.timedelta(hours=4),
-    start_date=dt.datetime(2016, 9, 20),
+    start_date=airflow.utils.dates.days_ago(2),
 )
 
 latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_passing_params_via_test_command.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index e337f3b..448effb 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -13,15 +13,15 @@
 # limitations under the License.
 #
 
-from datetime import datetime, timedelta
-
+from datetime import timedelta
+import airflow
 from airflow import DAG
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 
 dag = DAG("example_passing_params_via_test_command",
           default_args={"owner": "airflow",
-                        "start_date":datetime.now()},
+                        "start_date": airflow.utils.dates.days_ago(1)},
           schedule_interval='*/1 * * * *',
           dagrun_timeout=timedelta(minutes=4)
           )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index c5d7193..8108e1e 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -13,19 +13,16 @@
 # limitations under the License.
 from __future__ import print_function
 from builtins import range
+import airflow
 from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
-from datetime import datetime, timedelta
 
 import time
 from pprint import pprint
 
-seven_days_ago = datetime.combine(
-        datetime.today() - timedelta(7), datetime.min.time())
-
 args = {
     'owner': 'airflow',
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2)
 }
 
 dag = DAG(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 92efe99..c9812ac 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -11,17 +11,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import airflow
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 import airflow.utils.helpers
-from datetime import datetime, timedelta
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
-                                  datetime.min.time())
+
 args = {
     'owner': 'airflow',
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2)
 }
 
 dag = DAG(dag_id='example_short_circuit_operator', default_args=args)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_skip_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index a38b126..b936020 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -12,16 +12,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import airflow
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
-from datetime import datetime, timedelta
 from airflow.exceptions import AirflowSkipException
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(1),
-                                  datetime.min.time())
+
 args = {
     'owner': 'airflow',
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2)
 }
 
 
@@ -53,5 +52,3 @@ def create_test_pipeline(suffix, trigger_rule, dag):
 
 create_test_pipeline('1', 'all_success', dag)
 create_test_pipeline('2', 'one_success', dag)
-
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index b872f43..0c11787 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from datetime import datetime
+import airflow
 
 from airflow.models import DAG
 from airflow.operators.dummy_operator import DummyOperator
@@ -24,7 +24,7 @@ DAG_NAME = 'example_subdag_operator'
 
 args = {
     'owner': 'airflow',
-    'start_date': datetime(2016, 1, 1),
+    'start_date': airflow.utils.dates.days_ago(2),
 }
 
 dag = DAG(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 50728c3..b41421b 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -12,22 +12,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from __future__ import print_function
+import airflow
 from airflow import DAG
 from airflow.operators.python_operator import PythonOperator
-from datetime import datetime, timedelta
 
-seven_days_ago = datetime.combine(
-    datetime.today() - timedelta(7),
-    datetime.min.time())
 args = {
     'owner': 'airflow',
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2),
     'provide_context': True
 }
 
 dag = DAG(
     'example_xcom',
-    start_date=datetime(2015, 1, 1),
     schedule_interval="@once",
     default_args=args)
 
@@ -60,6 +56,7 @@ def puller(**kwargs):
     v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
     assert (v1, v2) == (value_1, value_2)
 
+
 push1 = PythonOperator(
     task_id='push', dag=dag, python_callable=push)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index 70391c3..0ed9bdb 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 """Used for unit tests"""
+import airflow
 from airflow.operators.bash_operator import BashOperator
 from airflow.models import DAG
 from datetime import datetime
@@ -25,5 +26,5 @@ task = BashOperator(
     task_id='sleeps_forever',
     dag=dag,
     bash_command="sleep 10000000000",
-    start_date=datetime(2016, 1, 1),
+    start_date=airflow.utils.dates.days_ago(2),
     owner='airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/tutorial.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index c7b2e0f..6ede09a 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -17,19 +17,18 @@
 Documentation that goes along with the Airflow tutorial located
 [here](http://pythonhosted.org/airflow/tutorial.html)
 """
+import airflow
 from airflow import DAG
 from airflow.operators.bash_operator import BashOperator
-from datetime import datetime, timedelta
+from datetime import timedelta
 
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
-                                  datetime.min.time())
 
 # these args will get passed on to each operator
 # you can override them on a per-task basis during operator initialization
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
-    'start_date': seven_days_ago,
+    'start_date': airflow.utils.dates.days_ago(2),
     'email': ['airflow@airflow.com'],
     'email_on_failure': False,
     'email_on_retry': False,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 84fd791..f89b20c 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -212,3 +212,16 @@ def scale_time_units(time_seconds_arr, unit):
     elif unit == 'days':
         return list(map(lambda x: x*1.0/(24*60*60), time_seconds_arr))
     return time_seconds_arr
+
+
+def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
+    """
+    Get a datetime object representing `n` days ago. By default the time is
+    set to midnight.
+    """
+    today = datetime.today().replace(
+        hour=hour,
+        minute=minute,
+        second=second,
+        microsecond=microsecond)
+    return today - timedelta(days=n)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/dags/test_dag.py
----------------------------------------------------------------------
diff --git a/dags/test_dag.py b/dags/test_dag.py
index a1cbb74..db0b648 100644
--- a/dags/test_dag.py
+++ b/dags/test_dag.py
@@ -24,7 +24,7 @@ DAG_NAME = 'test_dag_v1'
 default_args = {
     'owner': 'airflow',
     'depends_on_past': True,
-    'start_date': START_DATE,
+    'start_date': airflow.utils.dates.days_ago(2)
 }
 dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/scripts/perf/dags/perf_dag_1.py
----------------------------------------------------------------------
diff --git a/scripts/perf/dags/perf_dag_1.py b/scripts/perf/dags/perf_dag_1.py
index d97c830..fe71303 100644
--- a/scripts/perf/dags/perf_dag_1.py
+++ b/scripts/perf/dags/perf_dag_1.py
@@ -11,15 +11,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import airflow
 from airflow.operators.bash_operator import BashOperator
 from airflow.models import DAG
-from datetime import datetime, timedelta
+from datetime import timedelta
 
-five_days_ago = datetime.combine(datetime.today() - timedelta(5),
-                                 datetime.min.time())
 args = {
     'owner': 'airflow',
-    'start_date': five_days_ago,
+    'start_date': airflow.utils.dates.days_ago(3),
 }
 
 dag = DAG(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/scripts/perf/dags/perf_dag_2.py
----------------------------------------------------------------------
diff --git a/scripts/perf/dags/perf_dag_2.py b/scripts/perf/dags/perf_dag_2.py
index cccd547..16948d4 100644
--- a/scripts/perf/dags/perf_dag_2.py
+++ b/scripts/perf/dags/perf_dag_2.py
@@ -11,15 +11,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import airflow
 from airflow.operators.bash_operator import BashOperator
 from airflow.models import DAG
-from datetime import datetime, timedelta
+from datetime import timedelta
 
-five_days_ago = datetime.combine(datetime.today() - timedelta(5),
-                                 datetime.min.time())
 args = {
     'owner': 'airflow',
-    'start_date': five_days_ago,
+    'start_date': airflow.utils.dates.days_ago(3),
 }
 
 dag = DAG(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/tests/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
new file mode 100644
index 0000000..6b15998
--- /dev/null
+++ b/tests/utils/__init__.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .compression import *
+from .dates import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/tests/utils/dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/dates.py b/tests/utils/dates.py
new file mode 100644
index 0000000..dc0c87e
--- /dev/null
+++ b/tests/utils/dates.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+import unittest
+
+from airflow.utils import dates
+
+class Dates(unittest.TestCase):
+
+    def test_days_ago(self):
+        today = datetime.today()
+        today_midnight = datetime.fromordinal(today.date().toordinal())
+
+        self.assertTrue(dates.days_ago(0) == today_midnight)
+
+        self.assertTrue(
+            dates.days_ago(100) == today_midnight + timedelta(days=-100))
+
+        self.assertTrue(
+            dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3))
+        self.assertTrue(
+            dates.days_ago(0, minute=3)
+            == today_midnight + timedelta(minutes=3))
+        self.assertTrue(
+            dates.days_ago(0, second=3)
+            == today_midnight + timedelta(seconds=3))
+        self.assertTrue(
+            dates.days_ago(0, microsecond=3)
+            == today_midnight + timedelta(microseconds=3))
+
+
+if __name__ == '__main__':
+    unittest.main()


Mime
View raw message