airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Driesprong, Fokko" <fo...@driesprong.frl>
Subject Re: Return results optionally from spark_sql_hook
Date Sun, 15 Oct 2017 09:04:00 GMT
Hi Boris,

Instead of writing it to a file, you can also write it to xcom, this will
keep everything inside of Airflow. My personal opinion on this; spark-sql
is a bit limited by nature, it only support SQL. If you want to do more
dynamic stuff, you will eventually have to move to spark-submit anyway.

Cheers, Fokko

2017-10-14 14:45 GMT+02:00 Boris <boriskey@gmail.com>:

> Thanks Fokko, I think it will do it but my concern that in this case my dag
> will initiate two separate spark sessions and it takes about 20 seconds in
> our yarn environment to create it. I need to run 600 dags like that every
> morning.
>
> I am thinking now to create a pyspark job that will do insert and count and
> write it to a temp file. Still not ideal... I wish I could just parse spark
> SQL instead..
>
> On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fokko@driesprong.frl> wrote:
>
> > Hi Boris,
> >
> > That sounds like a nice DAG.
> >
> > This is how I would do it: First run the long running query in a
> spark-sql
> > operator like you have now. Create a python function that builds a
> > SparkSession within Python (using the Spark pyspark api) and fetches the
> > count from the spark partition that you've just created. Create a
> > BranchPythonOperator that will invoke this function, and based on, if the
> > count is ok or not, branch:
> >
> >    - If the count is okay, branch downstream and continue with the normal
> >    execution.
> >    - If the count is off, terminate and send you and email/slack that the
> >    count is not as expected.
> >
> > ‚ÄčThis will look something like this:
> > [image: Inline afbeelding 1]‚Äč
> >
> > Would this solve your problem?
> >
> > Cheers, Fokko
> >
> >
> >
> > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <boris@boristyukin.com>:
> >
> >> Hi Fokko, thanks for your response, really appreciate it!
> >>
> >> Basically in my case I have two Spark SQL queries:
> >>
> >> 1) the first query does INSERT OVERWRITE to a partition and may take a
> >> while for a while
> >> 2) then I run a second query right after it to get count of rows of that
> >> partition.
> >> 3) I need to pass that count back to airflow dag and this count will be
> >> used by the next task in the DAG to make a decision if this partition
> >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
> >> production table partition.
> >>
> >> So I need somehow to get that count of rows. My initial though was to
> >> parse
> >> the log and extract that count but looks like even if i do regex it does
> >> not quite work - spark sql writes query output to stdout which airflow
> >> spark sql hook does not capture right now.
> >>
> >> if you can suggest a better solution for me it would be great!
> >>
> >> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
> >> PARTITION in the same pyspark job but I found out that spark does not
> >> support this statement yet and I have to use Hive.
> >>
> >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <fokko@driesprong.frl
> >
> >> wrote:
> >>
> >> > Hi Boris,
> >> >
> >> > Thank you for your question and excuse me for the late response,
> >> currently
> >> > I'm on holiday.
> >> >
> >> > The solution that you suggest, would not be my preferred choice.
> >> Extracting
> >> > results from a log using a regex is expensive in terms of
> computational
> >> > costs, and error prone. My question is, what are you trying to
> >> accomplish?
> >> > For me there are two ways of using the Spark-sql operator:
> >> >
> >> >    1. ETL Using Spark: Instead of returning the results, write the
> >> results
> >> >    back to a new table, or a new partition within the table. This data
> >> can
> >> > be
> >> >    used downstream in the dag. Also, this will write the data to hdfs
> >> > which is
> >> >    nice for persistance.
> >> >    2. Write the data in a simple and widely supported format (such as
> >> csv)
> >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get`
> >> to
> >> > you
> >> >    local file-system. Or use `hdfs dfs -cat ... | application.py` to
> >> pipe
> >> > it
> >> >    to your application directly.
> >> >
> >> > What you are trying to accomplish, looks for me something that would
> fit
> >> > the spark-submit job, where you can submit pyspark applications where
> >> you
> >> > can directly fetch the results from Spark:
> >> >
> >> > Welcome to
> >> >       ____              __
> >> >      / __/__  ___ _____/ /__
> >> >     _\ \/ _ \/ _ `/ __/  '_/
> >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> >> >       /_/
> >> >
> >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> >> > SparkSession available as 'spark'.
> >> > >>> spark.sql("SELECT 1 as count").first()
> >> > Row(count=1)
> >> >
> >> > Most of the time we use the Spark-sql to transform the data, then use
> >> sqoop
> >> > to get the data from hdfs to a rdbms to expose the data to the
> business.
> >> > These examples are for Spark using hdfs, but for s3 it is somewhat the
> >> > same.
> >> >
> >> > Does this answer your question, if not, could you elaborate the
> problem
> >> > that you are facing?
> >> >
> >> > Ciao, Fokko
> >> >
> >> >
> >> >
> >> >
> >> > 2017-10-13 15:54 GMT+02:00 Boris <boriskey@gmail.com>:
> >> >
> >> > > hi guys,
> >> > >
> >> > > I opened JIRA on this and will be working on PR
> >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> >> > >
> >> > > any objections/suggestions conceptually?
> >> > >
> >> > > Fokko, I see you have been actively contributing to spark hooks and
> >> > > operators so I could use your opinion before I implement this.
> >> > >
> >> > > Boris
> >> > >
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message