airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicholas Hodgkinson <nik.hodgkin...@collectivehealth.com>
Subject DAG neither succeeds or fails, stays running.
Date Thu, 11 May 2017 21:57:22 GMT
I've got a DAG that, usually every few days, stops running, but does not
succeed or fail. Afaict this happens for two reasons:
1) A task will enter the queued state and never leave it; manually clearing
the state of the task will cause it to run.
2) One or more tasks enter the "removed" state which seemingly neither
count towards success nor failure of the overall job, these also must be
cleared for the job to continue.
It is usually the case that both 1 and 2 are present at the same time, but
both have been seen independently with the same result; namely the DagRun
state is still "Running", but the job makes no progress and will never be
marked as a success or failure even given several days.

A few words about my environment:
* I'm running 1.8.0, however nothing in the changelog addresses this
problem so while I will be upgrading next week I do not expect the point
release to fix this issue.
* This is machine is running as a single instance with the LocalExecutor,
we are not yet to the point where the CeleryExecutor makes sense for us.
* Happy to provide any other configuration information.

This DAG is generated with this code:
start_date = datetime(2017, 3, 24)
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_date,
    'email': ['nik.hodgkinson@collectivehealth.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

version = 2
with DAG('AUTOMATOR-sensor-v{0}'.format(version),
         default_args=default_args,
         schedule_interval="*/5 * * * *",
         max_active_runs=1) as dag:

    table_names = os.listdir(config_path)
    for table_name in table_names:
        config_dir = os.path.join(config_path, table_name)
        if not os.path.isdir(config_dir):
            continue

        config_filenames = os.listdir(config_dir)
        config = None
        config_filename = None
        for config_filename in config_filenames:
            if DataLoader.is_config_file(config_filename):
                config_filename = os.path.join(config_dir, config_filename)
                config = DataLoader.load_config_file(config_filename)
                break

        if config_filename is None \
                or config is None \
                or 'data-location' not in config \
                or 'config-location' not in config:
            # We're expecting a config file in the directory
            # if there isn't one, skip this directory.
            continue

        # Generate dag to load the data.
        contacts = config['contacts'] if 'contacts' in config else None
        (automator_dag_id, automator_dag) =
DataLoader.build_automator_dag(table_name=table_name,
config_contacts=contacts)
        logging.debug("Generated DAG for {0}".format(automator_dag_id))
        globals()[automator_dag_id] = automator_dag

        # Generate Sensor and Trigger
        s3_sensor =
DataLoader.build_automator_dag_config_sensor(table_name=table_name,
s3_file_location=config['config-location'])
        dag_trigger =
DataLoader.build_automator_dag_trigger(dag_id=automator_dag_id,
table_name=table_name, s3_hook=s3,
s3_config_location=config['config-location'],
s3_data_location=config['data-location'], trigger_dag=True)
        dag_trigger.set_upstream(s3_sensor)

Happy to answer any questions.

-N
nik.hodgkinson@collectivehealth.com

-- 


Read our founder's story. 
<https://collectivehealth.com/blog/started-collective-health/>

*This message may contain confidential, proprietary, or protected 
information.  If you are not the intended recipient, you may not review, 
copy, or distribute this message. If you received this message in error, 
please notify the sender by reply email and delete this message.*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message