airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] BasPH closed pull request #4071: [AIRFLOW-3237] Refactor example DAGs
Date Fri, 26 Oct 2018 07:19:59 GMT
BasPH closed pull request #4071: [AIRFLOW-3237] Refactor example DAGs
URL: https://github.com/apache/incubator-airflow/pull/4071
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index a5ff651d6a..68accc6317 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -17,48 +17,57 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import airflow
 from builtins import range
-from airflow.operators.bash_operator import BashOperator
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.models import DAG
 from datetime import timedelta
 
+import airflow
+from airflow.models import DAG
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
 
 args = {
     'owner': 'airflow',
-    'start_date': airflow.utils.dates.days_ago(2)
+    'start_date': airflow.utils.dates.days_ago(2),
 }
 
 dag = DAG(
-    dag_id='example_bash_operator', default_args=args,
+    dag_id='example_bash_operator',
+    default_args=args,
     schedule_interval='0 0 * * *',
-    dagrun_timeout=timedelta(minutes=60))
+    dagrun_timeout=timedelta(minutes=60),
+)
 
-cmd = 'ls -l'
-run_this_last = DummyOperator(task_id='run_this_last', dag=dag)
+run_this_last = DummyOperator(
+    task_id='run_this_last',
+    dag=dag,
+)
 
 # [START howto_operator_bash]
 run_this = BashOperator(
-    task_id='run_after_loop', bash_command='echo 1', dag=dag)
+    task_id='run_after_loop',
+    bash_command='echo 1',
+    dag=dag,
+)
 # [END howto_operator_bash]
-run_this.set_downstream(run_this_last)
+
+run_this >> run_this_last
 
 for i in range(3):
-    i = str(i)
     task = BashOperator(
-        task_id='runme_' + i,
+        task_id='runme_' + str(i),
         bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
-        dag=dag)
-    task.set_downstream(run_this)
+        dag=dag,
+    )
+    task >> run_this
 
 # [START howto_operator_bash_template]
-task = BashOperator(
+also_run_this = BashOperator(
     task_id='also_run_this',
     bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
-    dag=dag)
+    dag=dag,
+)
 # [END howto_operator_bash_template]
-task.set_downstream(run_this_last)
+also_run_this >> run_this_last
 
 if __name__ == "__main__":
     dag.cli()
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index 45bf11f301..197d7d7a73 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -17,43 +17,53 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import airflow
-from airflow.operators.python_operator import BranchPythonOperator
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.models import DAG
 import random
 
+import airflow
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
 
 args = {
     'owner': 'airflow',
-    'start_date': airflow.utils.dates.days_ago(2)
+    'start_date': airflow.utils.dates.days_ago(2),
 }
 
 dag = DAG(
     dag_id='example_branch_operator',
     default_args=args,
-    schedule_interval="@daily")
+    schedule_interval="@daily",
+)
 
-cmd = 'ls -l'
-run_this_first = DummyOperator(task_id='run_this_first', dag=dag)
+run_this_first = DummyOperator(
+    task_id='run_this_first',
+    dag=dag,
+)
 
 options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
 
 branching = BranchPythonOperator(
     task_id='branching',
     python_callable=lambda: random.choice(options),
-    dag=dag)
-branching.set_upstream(run_this_first)
+    dag=dag,
+)
+run_this_first >> branching
 
 join = DummyOperator(
     task_id='join',
     trigger_rule='one_success',
-    dag=dag
+    dag=dag,
 )
 
 for option in options:
-    t = DummyOperator(task_id=option, dag=dag)
-    t.set_upstream(branching)
-    dummy_follow = DummyOperator(task_id='follow_' + option, dag=dag)
-    t.set_downstream(dummy_follow)
-    dummy_follow.set_downstream(join)
+    t = DummyOperator(
+        task_id=option,
+        dag=dag,
+    )
+
+    dummy_follow = DummyOperator(
+        task_id='follow_' + option,
+        dag=dag,
+    )
+
+    branching >> t >> dummy_follow >> join
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index 7be55a5f36..950d5c1ae2 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -18,9 +18,9 @@
 # under the License.
 
 import airflow
-from airflow.operators.python_operator import BranchPythonOperator
-from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
 
 args = {
     'owner': 'airflow',
@@ -31,12 +31,14 @@
 # BranchPython operator that depends on past
 # and where tasks may run or be skipped on
 # alternating runs
-dag = DAG(dag_id='example_branch_dop_operator_v3',
-          schedule_interval='*/1 * * * *', default_args=args)
-
+dag = DAG(
+    dag_id='example_branch_dop_operator_v3',
+    schedule_interval='*/1 * * * *',
+    default_args=args,
+)
 
-def should_run(ds, **kwargs):
 
+def should_run(**kwargs):
     print('------------- exec dttm = {} and minute = {}'.
           format(kwargs['execution_date'], kwargs['execution_date'].minute))
     if kwargs['execution_date'].minute % 2 == 0:
@@ -49,14 +51,9 @@ def should_run(ds, **kwargs):
     task_id='condition',
     provide_context=True,
     python_callable=should_run,
-    dag=dag)
-
-oper_1 = DummyOperator(
-    task_id='oper_1',
-    dag=dag)
-oper_1.set_upstream(cond)
+    dag=dag,
+)
 
-oper_2 = DummyOperator(
-    task_id='oper_2',
-    dag=dag)
-oper_2.set_upstream(cond)
+dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
+dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
+cond >> [dummy_task_1, dummy_task_2]
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index da7ea3f218..eca528e899 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -49,14 +49,16 @@
     data=json.dumps({"priority": 5}),
     headers={"Content-Type": "application/json"},
     response_check=lambda response: True if len(response.json()) == 0 else False,
-    dag=dag)
+    dag=dag,
+)
 
 t5 = SimpleHttpOperator(
     task_id='post_op_formenc',
     endpoint='nodes/url',
     data="name=Joe",
     headers={"Content-Type": "application/x-www-form-urlencoded"},
-    dag=dag)
+    dag=dag,
+)
 
 t2 = SimpleHttpOperator(
     task_id='get_op',
@@ -64,7 +66,8 @@
     endpoint='api/v1.0/nodes',
     data={"param1": "value1", "param2": "value2"},
     headers={},
-    dag=dag)
+    dag=dag,
+)
 
 t3 = SimpleHttpOperator(
     task_id='put_op',
@@ -72,7 +75,8 @@
     endpoint='api/v1.0/nodes',
     data=json.dumps({"priority": 5}),
     headers={"Content-Type": "application/json"},
-    dag=dag)
+    dag=dag,
+)
 
 t4 = SimpleHttpOperator(
     task_id='del_op',
@@ -80,7 +84,8 @@
     endpoint='api/v1.0/nodes',
     data="some=data",
     headers={"Content-Type": "application/x-www-form-urlencoded"},
-    dag=dag)
+    dag=dag,
+)
 
 sensor = HttpSensor(
     task_id='http_sensor_check',
@@ -89,10 +94,7 @@
     request_params={},
     response_check=lambda response: True if "Google" in response.content else False,
     poke_interval=5,
-    dag=dag)
+    dag=dag,
+)
 
-t1.set_upstream(sensor)
-t2.set_upstream(t1)
-t3.set_upstream(t2)
-t4.set_upstream(t3)
-t5.set_upstream(t4)
+sensor >> t1 >> t2 >> t3 >> t4 >> t5
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index fdb2dca490..635a764198 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -33,6 +33,6 @@
 )
 
 latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
-
 task1 = DummyOperator(task_id='task1', dag=dag)
-task1.set_upstream(latest_only)
+
+latest_only >> task1
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index b8f4811c1a..3559afb0c8 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -34,15 +34,10 @@
 )
 
 latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
-
 task1 = DummyOperator(task_id='task1', dag=dag)
-task1.set_upstream(latest_only)
-
 task2 = DummyOperator(task_id='task2', dag=dag)
-
 task3 = DummyOperator(task_id='task3', dag=dag)
-task3.set_upstream([task1, task2])
+task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE)
 
-task4 = DummyOperator(task_id='task4', dag=dag,
-                      trigger_rule=TriggerRule.ALL_DONE)
-task4.set_upstream([task1, task2])
+latest_only >> task1 >> [task3, task4]
+task2 >> [task3, task4]
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index 7efca2f3b0..2aef593abf 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -18,18 +18,21 @@
 # under the License.
 
 from datetime import timedelta
+
 import airflow
 from airflow import DAG
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
 
-
-dag = DAG("example_passing_params_via_test_command",
-          default_args={"owner": "airflow",
-                        "start_date": airflow.utils.dates.days_ago(1)},
-          schedule_interval='*/1 * * * *',
-          dagrun_timeout=timedelta(minutes=4)
-          )
+dag = DAG(
+    "example_passing_params_via_test_command",
+    default_args={
+        "owner": "airflow",
+        "start_date": airflow.utils.dates.days_ago(1),
+    },
+    schedule_interval='*/1 * * * *',
+    dagrun_timeout=timedelta(minutes=4),
+)
 
 
 def my_py_command(ds, **kwargs):
@@ -54,12 +57,14 @@ def my_py_command(ds, **kwargs):
     provide_context=True,
     python_callable=my_py_command,
     params={"miff": "agg"},
-    dag=dag)
-
+    dag=dag,
+)
 
 also_run_this = BashOperator(
     task_id='also_run_this',
     bash_command=my_templated_command,
     params={"miff": "agg"},
-    dag=dag)
-also_run_this.set_upstream(run_this)
+    dag=dag,
+)
+
+run_this >> also_run_this
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 6c4874fc87..254fb3e26c 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -18,23 +18,25 @@
 # under the License.
 
 from __future__ import print_function
-from builtins import range
-import airflow
-from airflow.operators.python_operator import PythonOperator
-from airflow.models import DAG
 
 import time
+from builtins import range
 from pprint import pprint
 
+import airflow
+from airflow.models import DAG
+from airflow.operators.python_operator import PythonOperator
 
 args = {
     'owner': 'airflow',
-    'start_date': airflow.utils.dates.days_ago(2)
+    'start_date': airflow.utils.dates.days_ago(2),
 }
 
 dag = DAG(
-    dag_id='example_python_operator', default_args=args,
-    schedule_interval=None)
+    dag_id='example_python_operator',
+    default_args=args,
+    schedule_interval=None,
+)
 
 
 # [START howto_operator_python]
@@ -48,7 +50,8 @@ def print_context(ds, **kwargs):
     task_id='print_the_context',
     provide_context=True,
     python_callable=print_context,
-    dag=dag)
+    dag=dag,
+)
 # [END howto_operator_python]
 
 
@@ -64,7 +67,8 @@ def my_sleeping_function(random_base):
         task_id='sleep_for_' + str(i),
         python_callable=my_sleeping_function,
         op_kwargs={'random_base': float(i) / 10},
-        dag=dag)
+        dag=dag,
+    )
 
-    task.set_upstream(run_this)
+    run_this >> task
 # [END howto_operator_python_kwargs]
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 8e9565df91..1093dab616 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -17,25 +17,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import airflow
-from airflow.operators.python_operator import ShortCircuitOperator
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.models import DAG
 import airflow.utils.helpers
-
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import ShortCircuitOperator
 
 args = {
     'owner': 'airflow',
-    'start_date': airflow.utils.dates.days_ago(2)
+    'start_date': airflow.utils.dates.days_ago(2),
 }
 
 dag = DAG(dag_id='example_short_circuit_operator', default_args=args)
 
 cond_true = ShortCircuitOperator(
-    task_id='condition_is_True', python_callable=lambda: True, dag=dag)
+    task_id='condition_is_True',
+    python_callable=lambda: True,
+    dag=dag,
+)
 
 cond_false = ShortCircuitOperator(
-    task_id='condition_is_False', python_callable=lambda: False, dag=dag)
+    task_id='condition_is_False',
+    python_callable=lambda: False,
+    dag=dag,
+)
 
 ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
 ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]]
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index f11ca59338..456eb911dc 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -18,14 +18,13 @@
 # under the License.
 
 import airflow
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.models import DAG
 from airflow.exceptions import AirflowSkipException
-
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
 
 args = {
     'owner': 'airflow',
-    'start_date': airflow.utils.dates.days_ago(2)
+    'start_date': airflow.utils.dates.days_ago(2),
 }
 
 
@@ -37,23 +36,17 @@ def execute(self, context):
         raise AirflowSkipException
 
 
-dag = DAG(dag_id='example_skip_dag', default_args=args)
-
-
 def create_test_pipeline(suffix, trigger_rule, dag):
-
     skip_operator = DummySkipOperator(task_id='skip_operator_{}'.format(suffix), dag=dag)
-
     always_true = DummyOperator(task_id='always_true_{}'.format(suffix), dag=dag)
-
     join = DummyOperator(task_id=trigger_rule, dag=dag, trigger_rule=trigger_rule)
-
-    join.set_upstream(skip_operator)
-    join.set_upstream(always_true)
-
     final = DummyOperator(task_id='final_{}'.format(suffix), dag=dag)
-    final.set_upstream(join)
+
+    skip_operator >> join
+    always_true >> join
+    join >> final
 
 
+dag = DAG(dag_id='example_skip_dag', default_args=args)
 create_test_pipeline('1', 'all_success', dag)
 create_test_pipeline('2', 'one_success', dag)
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index ffd254b19a..98386ba454 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -18,14 +18,11 @@
 # under the License.
 
 import airflow
-
+from airflow.example_dags.subdags.subdag import subdag
 from airflow.models import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 
-from airflow.example_dags.subdags.subdag import subdag
-
-
 DAG_NAME = 'example_subdag_operator'
 
 args = {
@@ -71,7 +68,4 @@
     dag=dag,
 )
 
-start.set_downstream(section_1)
-section_1.set_downstream(some_other_task)
-some_other_task.set_downstream(section_2)
-section_2.set_downstream(end)
+start >> section_1 >> some_other_task >> section_2 >> end
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index f5c7218239..35e7184f76 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -33,11 +33,11 @@
 2. A Target DAG : c.f. example_trigger_target_dag.py
 """
 
-from airflow import DAG
-from airflow.operators.dagrun_operator import TriggerDagRunOperator
+import pprint
 from datetime import datetime
 
-import pprint
+from airflow import DAG
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
 
 pp = pprint.PrettyPrinter(indent=4)
 
@@ -53,16 +53,20 @@ def conditionally_trigger(context, dag_run_obj):
 
 
 # Define the DAG
-dag = DAG(dag_id='example_trigger_controller_dag',
-          default_args={"owner": "airflow",
-                        "start_date": datetime.utcnow()},
-          schedule_interval='@once')
-
+dag = DAG(
+    dag_id='example_trigger_controller_dag',
+    default_args={
+        "owner": "airflow",
+        "start_date": datetime.utcnow(),
+    },
+    schedule_interval='@once',
+)
 
 # Define the single task in this controller example DAG
-trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
-                                trigger_dag_id="example_trigger_target_dag",
-                                python_callable=conditionally_trigger,
-                                params={'condition_param': True,
-                                        'message': 'Hello World'},
-                                dag=dag)
+trigger = TriggerDagRunOperator(
+    task_id='test_trigger_dagrun',
+    trigger_dag_id="example_trigger_target_dag",
+    python_callable=conditionally_trigger,
+    params={'condition_param': True, 'message': 'Hello World'},
+    dag=dag,
+)
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 7a656f2859..c1403a60e1 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -17,12 +17,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import pprint
+from datetime import datetime
+
+from airflow.models import DAG
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
-from airflow.models import DAG
-from datetime import datetime
 
-import pprint
 pp = pprint.PrettyPrinter(indent=4)
 
 # This example illustrates the use of the TriggerDagRunOperator. There are 2
@@ -50,7 +51,8 @@
 dag = DAG(
     dag_id='example_trigger_target_dag',
     default_args=args,
-    schedule_interval=None)
+    schedule_interval=None,
+)
 
 
 def run_this_func(ds, **kwargs):
@@ -62,12 +64,13 @@ def run_this_func(ds, **kwargs):
     task_id='run_this',
     provide_context=True,
     python_callable=run_this_func,
-    dag=dag)
-
+    dag=dag,
+)
 
 # You can also access the DagRun object in templates
 bash_task = BashOperator(
     task_id="bash_task",
     bash_command='echo "Here is the message: '
                  '{{ dag_run.conf["message"] if dag_run else "" }}" ',
-    dag=dag)
+    dag=dag,
+)
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 66bec9a780..f2b7627aca 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 from __future__ import print_function
+
 import airflow
 from airflow import DAG
 from airflow.operators.python_operator import PythonOperator
@@ -24,25 +25,22 @@
 args = {
     'owner': 'airflow',
     'start_date': airflow.utils.dates.days_ago(2),
-    'provide_context': True
+    'provide_context': True,
 }
 
-dag = DAG(
-    'example_xcom',
-    schedule_interval="@once",
-    default_args=args)
+dag = DAG('example_xcom', schedule_interval="@once", default_args=args)
 
 value_1 = [1, 2, 3]
 value_2 = {'a': 'b'}
 
 
 def push(**kwargs):
-    # pushes an XCom without a specific target
+    """Pushes an XCom without a specific target"""
     kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
 
 
 def push_by_returning(**kwargs):
-    # pushes an XCom without a specific target, just by returning it
+    """Pushes an XCom without a specific target, just by returning it"""
     return value_2
 
 
@@ -63,12 +61,21 @@ def puller(**kwargs):
 
 
 push1 = PythonOperator(
-    task_id='push', dag=dag, python_callable=push)
+    task_id='push',
+    dag=dag,
+    python_callable=push,
+)
 
 push2 = PythonOperator(
-    task_id='push_by_returning', dag=dag, python_callable=push_by_returning)
+    task_id='push_by_returning',
+    dag=dag,
+    python_callable=push_by_returning,
+)
 
 pull = PythonOperator(
-    task_id='puller', dag=dag, python_callable=puller)
+    task_id='puller',
+    dag=dag,
+    python_callable=puller,
+)
 
-pull.set_upstream([push1, push2])
+pull << [push1, push2]
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index fb8792a1bf..3fc8af1dad 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -18,17 +18,15 @@
 # under the License.
 """Used for unit tests"""
 import airflow
-from airflow.operators.bash_operator import BashOperator
 from airflow.models import DAG
+from airflow.operators.bash_operator import BashOperator
 
-dag = DAG(
-    dag_id='test_utils',
-    schedule_interval=None,
-)
+dag = DAG(dag_id='test_utils', schedule_interval=None)
 
 task = BashOperator(
     task_id='sleeps_forever',
     dag=dag,
     bash_command="sleep 10000000000",
     start_date=airflow.utils.dates.days_ago(2),
-    owner='airflow')
+    owner='airflow',
+)
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index ad817338ef..ccf2e6e2ee 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -22,14 +22,14 @@
 Documentation that goes along with the Airflow tutorial located
 [here](https://airflow.incubator.apache.org/tutorial.html)
 """
+from datetime import timedelta
+
 import airflow
 from airflow import DAG
 from airflow.operators.bash_operator import BashOperator
-from datetime import timedelta
-
 
-# these args will get passed on to each operator
-# you can override them on a per-task basis during operator initialization
+# These args will get passed on to each operator
+# You can override them on a per-task basis during operator initialization
 default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
@@ -58,13 +58,15 @@
     'tutorial',
     default_args=default_args,
     description='A simple tutorial DAG',
-    schedule_interval=timedelta(days=1))
+    schedule_interval=timedelta(days=1),
+)
 
 # t1, t2 and t3 are examples of tasks created by instantiating operators
 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
-    dag=dag)
+    dag=dag,
+)
 
 t1.doc_md = """\
 #### Task Documentation
@@ -80,7 +82,8 @@
     task_id='sleep',
     depends_on_past=False,
     bash_command='sleep 5',
-    dag=dag)
+    dag=dag,
+)
 
 templated_command = """
 {% for i in range(5) %}
@@ -95,7 +98,7 @@
     depends_on_past=False,
     bash_command=templated_command,
     params={'my_param': 'Parameter I passed in'},
-    dag=dag)
+    dag=dag,
+)
 
-t2.set_upstream(t1)
-t3.set_upstream(t1)
+t1 >> [t2, t3]
diff --git a/docs/tutorial.rst b/docs/tutorial.rst
index 0ea58d2784..570cd75c80 100644
--- a/docs/tutorial.rst
+++ b/docs/tutorial.rst
@@ -247,23 +247,36 @@ in templates, make sure to read through the :ref:`macros` section
 
 Setting up Dependencies
 -----------------------
-We have two simple tasks that do not depend on each other. Here's a few ways
+We have tasks `t1`, `t2` and `t3` that do not depend on each other. Here's a few ways
 you can define dependencies between them:
 
 .. code:: python
 
-    t2.set_upstream(t1)
+    t1.set_downstream(t2)
 
     # This means that t2 will depend on t1
-    # running successfully to run
-    # It is equivalent to
-    # t1.set_downstream(t2)
+    # running successfully to run.
+    # It is equivalent to:
+    t2.set_upstream(t1)
 
-    t3.set_upstream(t1)
+    # The bit shift operator can also be
+    # used to chain operations:
+    t1 >> t2
+
+    # And the upstream dependency with the
+    # bit shift operator:
+    t2 << t1
+
+    # Chaining multiple dependencies becomes
+    # concise with the bit shift operator:
+    t1 >> t2 >> t3
 
-    # all of this is equivalent to
-    # dag.set_dependency('print_date', 'sleep')
-    # dag.set_dependency('print_date', 'templated')
+    # A list of tasks can also be set as
+    # dependencies. These operations
+    # all have the same effect:
+    t1.set_downstream([t2, t3])
+    t1 >> [t2, t3]
+    [t2, t3] << t1
 
 Note that when executing your script, Airflow will raise exceptions when
 it finds cycles in your DAG or when a dependency is referenced more


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message