airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeremiah Lowin <>
Subject Re: Using xcom with web APIs that return JSON
Date Tue, 31 May 2016 12:54:53 GMT
Mike, I think you'll need to overwrite the operator classes. Here's a
sketch to get you started, you may find more efficient ways of doing this:

class MyHttpOperator(HttpOperator):

    def execute(self, context):
        # Code from parent class
        http = HttpHook(self.method, http_conn_id=self.http_conn_id)"Calling HTTP method")
        response =,
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Response check returned False.")

        # NEW CODE BELOW
        # to push the response
        self.xcom_push(key='json_data', value=response)
        # OR just return the response
        return response

class MyHttpSensor(HttpSensor):
    def poke(self, context):
        # if you called xcom_push, then provide the key
        # if you returned a value, then provide the task_id
        json_data = self.xcom_pull(

        # Then copy/paste poke() code from parent class here,
        # modified to use json_data in the query


On Thu, May 26, 2016 at 12:06 PM Michael England <>

> Hi,
> I am having some trouble working with the xcom functionality in the
> HttpOperators.
> I am trying to use Airflow to call a web API to schedule a task
> asynchronously, returning some JSON data, and then based on the value of
> one of the JSON fields I would like to use an HttpSensor to query an end
> point until the task is complete.
> Does anyone have any examples of how this can be done?  The documentation
> is focused on BashOperators and PythonOperators.
> Thanks in advance!Mike

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message