airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiajie Zhong <zhongjiajie...@hotmail.com>
Subject Re: PostgreSQL hook
Date Mon, 01 Apr 2019 05:31:13 GMT
Sorry for reply late, busy with job and my family.

Yes, you're right.
I review the `postgres_hook.py` and it inherit from `DbApiHook`
So each time your use `run` function will create new connection in
https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172>

apache/airflow<https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172>
Apache Airflow. Contribute to apache/airflow development by creating an account on GitHub.
github.com

In this sutiation, I personally sugguest you cursor in operator.

    def execute(self, context):
        pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
 --> here      cursor = self.hook.get_cursor()                   <--

        if self.pg_preoperator:
            self.log.info("Running Postgres preoperator")
            cursor.execute(self.pg_preoperator)

        self.log.info("loading file to pg table.")
        cursor.execute(sql=self.sql)

        if self.pg_postoperator:
            self.log.info("Running Postgres postoperator")
             cursor.execute(self.pg_postoperator)



Best wish.
-- Jiajie
________________________________
From: Flo Rance <trourance@gmail.com>
Sent: Tuesday, March 26, 2019 15:55
To: dev@airflow.apache.org
Subject: Re: PostgreSQL hook

Hi,

This is what I tried first, but it's not working.

pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
pg.run(...)

This part of code will always use a a new connection, and therefore the
temp table created in preoperator will not be accessible for the COPY
command.
For this to work, I had to use a cursor instead.

Flo

On Sat, Mar 23, 2019 at 10:22 AM Jiajie Zhong <zhongjiajie955@hotmail.com>
wrote:

> I wrote a demo code here. maybe it not work but I think the idea is right.
>
>
> # create file name file_to_Pg.py
> from airflow.models import BaseOperator
> from airflow.hooks.postgres_hook import PostgresHook
>
> class FileToPgTransfer(BaseOperator):
>     def __init__(self, postgres_conn_id, pg_preoperator, sql,
> pg_postoperator, ... , *args, **kwargs):
>         # init here esle
>         self.postgres_conn_id = postgres_conn_id
>         self.pg_preoperator = pg_preoperator
>         self.sql = sql
>         self.pg_postoperator = pg_postoperator
>         # init here esle
>
>     def execute(self, context):
>         pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
>
>         if self.pg_preoperator:
>             self.log.info("Running Postgres preoperator")
>             pg.run(self.pg_preoperator)
>
>         self.log.info("loading file to pg table.")
>         pg.run(sql=self.sql)
>
>         if self.pg_postoperator:
>             self.log.info("Running Postgres postoperator")
>             pg.run(self.pg_postoperator)
>
>
> # you could use using below code in DAG file
> task = FileToPgTransfer(
>     postgres_conn_id='postgres_conn_id',
>     pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT
> * FROM catalog WITH NO DATA',
>     sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH
> DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
>     pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
> )
>
>
> Best wish.
> -- jiajie
> ________________________________
> From: Flo Rance <trourance@gmail.com>
> Sent: Thursday, March 21, 2019 22:31
> To: dev@airflow.apache.org
> Subject: Re: PostgreSQL hook
>
> I've found some more information that seems to confirm my suspicion.
>
> The connection is not persistent between the pg_preoperator step and the
> copy_expert one.
>
> There's a suggestion to make persistence a property of the connection:
>
> https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
>
> I would be very grateful if someone could help me implement that in my
> operator that uses PostgreSQL Hook.
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <trourance@gmail.com> wrote:
>
> > No problem.
> >
> > Thanks for the link, I was able to create a plugin and an operator that
> do
> > almost what I want.
> >
> > My only issue is regarding the temp table, because it's not available
> when
> > I call copy_expert. So it seems to me that's not the same session as the
> > one that created the temp table previously, because if I use a standard
> > table I don't have this issue.
> >
> > Does anyone have an idea how to fix this?
> >
> > Regards,
> > Flo
> >
> > On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zhongjiajie955@hotmail.com
> >
> > wrote:
> >
> >> Using Airflow plugins, maybe you should take a look at
> >> https://airflow.apache.org/plugins.html.
> >>
> >> BTW, sorry for send duplicate e-mail last night, due to my network
> failure
> >>
> >> Best wish.
> >> - jiajie
> >>
> >> ________________________________
> >>
> >> Hi,
> >>
> >> Thank you for this explanation. If I summarize, I'll have to write a
> >> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> >> parameters.
> >>
> >> Just a simple question: Where should I add and store this Operator in
> the
> >> airflow ecosystem ?
> >>
> >> Regards,
> >> Flo
> >>
> >> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <
> zhongjiajie955@hotmail.com>
> >> wrote:
> >>
> >> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> >> > <
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >> > >
> >> > I think maybe this is what you looking for.
> >> >
> >> > And, we recommend use Operator to do something instead of Hook. But in
> >> we
> >> > have no "local-file-pg-operator". maybe you should and this function
> by
> >> > youself.
> >> >
> >> > BWT, I think
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> > what you said is a transaction, and so do in a single operator. you
> >> could
> >> > write code just like
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> >
> >> > apache/airflow<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> > Apache Airflow. Contribute to apache/airflow development by creating
> an
> >> > account on GitHub.
> >> > github.com
> >> >
> >> > that have "pg_preoperator" and "pg_postoperator" parameter, but
> extract
> >> > data from local file instand of hive.
> >> >
> >> > ________________________________
> >> > From: Flo Rance <trourance@gmail.com>
> >> > Sent: Wednesday, March 20, 2019 23:30
> >> > To: dev@airflow.apache.org
> >> > Subject: PostgreSQL hook
> >> >
> >> > Hi,
> >> >
> >> > I don't know if it's the correct place to ask for that.
> >> >
> >> > I'm trying to implement one of my cronjob using airflow. One of the
> >> tasks
> >> > is to load files in a temporary table and then update another table
> in a
> >> > postgres db.
> >> > For that, I was previously using a sql script like that:
> >> >
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> >
> >> > I would like to replace \copy with the copy_expert from postgresql
> >> hook. Is
> >> > that realistic ?
> >> > If yes, how can I combine a sql script and that hook in one task ?
> >> >
> >> > Regards,
> >> > Flo
> >> >
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message