airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alex Papanicolaou (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-62) XCom push not working reliably
Date Fri, 06 May 2016 22:48:12 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-62?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274859#comment-15274859
] 

Alex Papanicolaou edited comment on AIRFLOW-62 at 5/6/16 10:47 PM:
-------------------------------------------------------------------

Here is a run from the Airflow version PyPI.  The task {{upload_activity_status}} is different
since obtaining the upstream task ids is not as easy compared to updates on the master branch.
 Under the PyPI version, all the XCom pushes work as expected.

Note: I reran the DAG and changed the poller task ids to make sure there wasn't any mixup
from another run.  Still the same result. 

*PyPI useable* {{upload_activity_status}}
{code}
def upload_activity_status(pgconn_id, **context):
    upstream_task_ids = map(
        attrgetter('task_id'), context['task'].upstream_list)
    # upstream_task_ids = context['task'].upstream_task_ids
    logging.info(
        "Getting status from upstream task {}".format(upstream_task_ids))
    status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
    logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
    logging.info("Upload to DB here")
{code}

*Log for Task* {{upload_activity_status}}
{noformat}
--------------------------------------------------------------------------------
Attempt 1 out of 2
--------------------------------------------------------------------------------

[2016-05-06 22:15:51,093] {models.py:1041} INFO - Executing <Task(PythonOperator): upload_activity_status>
on 2016-05-06 20:00:00
[2016-05-06 22:15:51,103] {xcom_test_dag_pypi.py:44} INFO - Getting status from upstream task
['poll_stream0', 'poll_stream1', 'poll_stream2', 'poll_stream3', 'poll_stream4', 'poll_stream5',
'poll_stream6', 'poll_stream7', 'poll_stream8', 'poll_stream9', 'poll_stream10', 'poll_stream11',
'poll_stream12', 'poll_stream13', 'poll_stream14', 'poll_stream15', 'poll_stream16', 'poll_stream17',
'poll_stream18', 'poll_stream19', 'poll_stream20', 'poll_stream21', 'poll_stream22', 'poll_stream23',
'poll_stream24', 'poll_stream25', 'poll_stream26', 'poll_stream27', 'poll_stream28', 'poll_stream29']
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:46} INFO - Xcom pull results:
({'data_stream': 'stream0',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream1',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream2',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream3',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream4',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream5',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream6',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream7',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream8',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream9',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream10',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream11',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream12',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream13',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream14',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream15',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream16',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream17',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream18',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream19',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream20',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream21',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream22',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream23',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream24',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream25',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream26',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream27',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream28',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream29',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)})
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:47} INFO - Upload to DB here
[2016-05-06 22:15:51,163] {python_operator.py:66} INFO - Done. Returned value was: None
{noformat}


was (Author: alex.papanic):
Here is a run from the Airflow version PyPI.  The task {{upload_activity_status}} is different
since obtaining the upstream task ids is not as easy compared to updates on the master branch.

*PyPI useable* {{upload_activity_status}}
{code}
def upload_activity_status(pgconn_id, **context):
    upstream_task_ids = map(
        attrgetter('task_id'), context['task'].upstream_list)
    # upstream_task_ids = context['task'].upstream_task_ids
    logging.info(
        "Getting status from upstream task {}".format(upstream_task_ids))
    status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
    logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
    logging.info("Upload to DB here")
{code}

*Log for Task* {{upload_activity_status}}
{noformat}
--------------------------------------------------------------------------------
Attempt 1 out of 2
--------------------------------------------------------------------------------

[2016-05-06 22:15:51,093] {models.py:1041} INFO - Executing <Task(PythonOperator): upload_activity_status>
on 2016-05-06 20:00:00
[2016-05-06 22:15:51,103] {xcom_test_dag_pypi.py:44} INFO - Getting status from upstream task
['poll_stream0', 'poll_stream1', 'poll_stream2', 'poll_stream3', 'poll_stream4', 'poll_stream5',
'poll_stream6', 'poll_stream7', 'poll_stream8', 'poll_stream9', 'poll_stream10', 'poll_stream11',
'poll_stream12', 'poll_stream13', 'poll_stream14', 'poll_stream15', 'poll_stream16', 'poll_stream17',
'poll_stream18', 'poll_stream19', 'poll_stream20', 'poll_stream21', 'poll_stream22', 'poll_stream23',
'poll_stream24', 'poll_stream25', 'poll_stream26', 'poll_stream27', 'poll_stream28', 'poll_stream29']
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:46} INFO - Xcom pull results:
({'data_stream': 'stream0',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream1',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream2',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream3',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream4',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream5',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream6',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream7',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream8',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream9',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream10',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream11',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream12',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream13',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream14',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream15',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream16',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream17',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream18',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream19',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream20',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream21',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream22',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream23',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream24',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream25',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream26',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream27',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream28',
  'status': False,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)},
 {'data_stream': 'stream29',
  'status': True,
  'timeperiod': datetime.datetime(2016, 5, 6, 19, 0)})
[2016-05-06 22:15:51,163] {xcom_test_dag_pypi.py:47} INFO - Upload to DB here
[2016-05-06 22:15:51,163] {python_operator.py:66} INFO - Done. Returned value was: None
{noformat}

> XCom push not working reliably
> ------------------------------
>
>                 Key: AIRFLOW-62
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-62
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: db, operators
>    Affects Versions: Airflow 1.7.0
>         Environment: Postgres backed Airflow running with Celery inside of the puckel
Docker setup.
>            Reporter: Alex Papanicolaou
>            Assignee: Jeremiah Lowin
>
> I have a DAG that polls for activity in various data streams from a database and then
uploads the activity statuses to a table.  Each of the polling tasks are python operators
that once they get the polling result, return a dict as an XCom push.  The dict contains two
entries which are strings, one which is a bool, and one which is a datetime object.  There
is a final task that pulls all the results and uploads the collective statuses to a table.
 I chose this pattern since I figured it might be better to do one collective write operation
on all the results.
> Before I moved ahead to the github master branch I was using 1.7.0 from PyPI and this
worked fine.  Now that I am on the github master branch, I find that the XCom pushing is unreliable.
 The returned values in the logs show up correctly but when doing the XCom pull, I get None
for some of the returned values.  Investigating the XCom result in the Webserver also shows
nothing there.  But if I rerun a task where the XCom failed, the push works and the XCom result
is as it should be.
> Nothing appears to have changed in the codebase so I am at a loss.  Perhaps it really
wasn't working before?  How would the backing postgres handle these simultaneous writes? 
I can't imagine that would be a problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message