airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tamara Mendt>
Subject Re: MySQL operator that returns rows
Date Mon, 01 Aug 2016 17:09:02 GMT
Hello Max,

Thanks a lot for the thorough and nicely explained answer. I get your point
of tasks being idempotent. Though I had read that tasks are not expected to
move data between each other, I tried using XCom for this anyway because my
use case was just to run a query against a DB replica to get the lag (if
there is one) for that replica. I would like to perform this check before
running one of my DAGs. Maybe this is not a great application of Airflow
operators though?

I get the point of instead using a PythonOperator to initialize a hook to
get any data which it might need. I was hoping to keep python functions
used in PythonOperators free from any Airflow specific classes (like the
Hook) and liked the idea of having the logic of querying MySQL and the
logic of parsing the result separately since the MySql operators explicitly
expect conn_id and sql parameters, whereas using the PythonOperator would
take these as optional parameters. Perhaps it becomes less evident that the
Operator is actually querying a DB. We have other airflow tasks which query
databases (using classes defined separately from Airflow) and apply certain
logic to the data, but which are defined as BashOperators and run python
scripts that contian the entire logic of the task. What would be the
advantage of using one approach over the other?

Thanks again,



On Mon, Aug 1, 2016 at 5:41 PM, Maxime Beauchemin <> wrote:

> Operators are meant to define an atomic, self-contained task. Meaning once
> the operator is done, the worker is done and no state persist (XCom are the
> exception to this, but let's ignore this for now). Ideally a task (the
> object returned when instantiating the operator) is idempotent and
> shouldn't leave a trace on the worker since there's no guarantee that a
> subsequent task would run on the same worker.
> The question you need to ask yourself is: what do I want to do with the
> returned rows?
> If say, you were going to do something generic like loading them to another
> database like Hive, then it makes sense to use (or create if it doesn't
> exist) a MySqlToHiveTransfer operator, because it's generic and can be
> reused. If you wanted to say score the rows using a machine learning model,
> you may want to use a PythonOperator that uses the hook to get the data,
> apply transformation and ship the (now scored) rows back some other place.
> That PythonOperator is also "atomic" and stateless beyond the boundaries of
> task itself as far as Airflow is concerned.
> Most of the reusable logic should live in hooks, so that operator simply
> use the generic hook functions. Hooks methods are the building blocks, and
> operators simply assemble them.
> Max
> On Mon, Aug 1, 2016 at 6:32 AM, Tamara Mendt <> wrote:
> > Hello,
> >
> > I would like to use an operator which executes an SQL command and passes
> > the result of this command to the next downstream operator. However, I
> > noticed that the MySqlOperator only executes the command, without
> fetching
> > the results.
> >
> > The solution I came up with was to create a new operator class that looks
> > like this:
> >
> > class ReturnRowsMySqlOperator(MySqlOperator):
> >
> >     def execute(self, context):
> >
> >'Executing: ' + str(self.sql))
> >         hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
> >         return_value = hook.get_records(self.sql,
> >                                         parameters=self.parameters)
> >         return return_value
> >
> >
> > It is a child class to the MySqlOperator, but overwrites the execute
> > function to fetch the results of the execution and return them, so they
> are
> > accessible by the following task. Is there any better way to achieve
> this?
> >
> >
> > Thanks and cheers,
> >
> > Tamara
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message