airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris <boris...@gmail.com>
Subject Re: Return results optionally from spark_sql_hook
Date Sun, 15 Oct 2017 15:16:48 GMT
Thanks Fokko. Do you know if it is better to use pyspark directly within
python operator or invoke submit-job instead? My understanding in both
cases airflow uses yarn-client deployment mode, not yarn-cluster and spark
driver always runs on the same node with airflow worker. Not sure it is the
best practice...

On Oct 15, 2017 05:04, "Driesprong, Fokko" <fokko@driesprong.frl> wrote:

> Hi Boris,
>
> Instead of writing it to a file, you can also write it to xcom, this will
> keep everything inside of Airflow. My personal opinion on this; spark-sql
> is a bit limited by nature, it only support SQL. If you want to do more
> dynamic stuff, you will eventually have to move to spark-submit anyway.
>
> Cheers, Fokko
>
> 2017-10-14 14:45 GMT+02:00 Boris <boriskey@gmail.com>:
>
> > Thanks Fokko, I think it will do it but my concern that in this case my
> dag
> > will initiate two separate spark sessions and it takes about 20 seconds
> in
> > our yarn environment to create it. I need to run 600 dags like that every
> > morning.
> >
> > I am thinking now to create a pyspark job that will do insert and count
> and
> > write it to a temp file. Still not ideal... I wish I could just parse
> spark
> > SQL instead..
> >
> > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fokko@driesprong.frl>
> wrote:
> >
> > > Hi Boris,
> > >
> > > That sounds like a nice DAG.
> > >
> > > This is how I would do it: First run the long running query in a
> > spark-sql
> > > operator like you have now. Create a python function that builds a
> > > SparkSession within Python (using the Spark pyspark api) and fetches
> the
> > > count from the spark partition that you've just created. Create a
> > > BranchPythonOperator that will invoke this function, and based on, if
> the
> > > count is ok or not, branch:
> > >
> > >    - If the count is okay, branch downstream and continue with the
> normal
> > >    execution.
> > >    - If the count is off, terminate and send you and email/slack that
> the
> > >    count is not as expected.
> > >
> > > ‚ÄčThis will look something like this:
> > > [image: Inline afbeelding 1]‚Äč
> > >
> > > Would this solve your problem?
> > >
> > > Cheers, Fokko
> > >
> > >
> > >
> > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <boris@boristyukin.com>:
> > >
> > >> Hi Fokko, thanks for your response, really appreciate it!
> > >>
> > >> Basically in my case I have two Spark SQL queries:
> > >>
> > >> 1) the first query does INSERT OVERWRITE to a partition and may take a
> > >> while for a while
> > >> 2) then I run a second query right after it to get count of rows of
> that
> > >> partition.
> > >> 3) I need to pass that count back to airflow dag and this count will
> be
> > >> used by the next task in the DAG to make a decision if this partition
> > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> with a
> > >> production table partition.
> > >>
> > >> So I need somehow to get that count of rows. My initial though was to
> > >> parse
> > >> the log and extract that count but looks like even if i do regex it
> does
> > >> not quite work - spark sql writes query output to stdout which airflow
> > >> spark sql hook does not capture right now.
> > >>
> > >> if you can suggest a better solution for me it would be great!
> > >>
> > >> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
> > >> PARTITION in the same pyspark job but I found out that spark does not
> > >> support this statement yet and I have to use Hive.
> > >>
> > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko
> <fokko@driesprong.frl
> > >
> > >> wrote:
> > >>
> > >> > Hi Boris,
> > >> >
> > >> > Thank you for your question and excuse me for the late response,
> > >> currently
> > >> > I'm on holiday.
> > >> >
> > >> > The solution that you suggest, would not be my preferred choice.
> > >> Extracting
> > >> > results from a log using a regex is expensive in terms of
> > computational
> > >> > costs, and error prone. My question is, what are you trying to
> > >> accomplish?
> > >> > For me there are two ways of using the Spark-sql operator:
> > >> >
> > >> >    1. ETL Using Spark: Instead of returning the results, write the
> > >> results
> > >> >    back to a new table, or a new partition within the table. This
> data
> > >> can
> > >> > be
> > >> >    used downstream in the dag. Also, this will write the data to
> hdfs
> > >> > which is
> > >> >    nice for persistance.
> > >> >    2. Write the data in a simple and widely supported format (such
> as
> > >> csv)
> > >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs
> -get`
> > >> to
> > >> > you
> > >> >    local file-system. Or use `hdfs dfs -cat ... | application.py`
to
> > >> pipe
> > >> > it
> > >> >    to your application directly.
> > >> >
> > >> > What you are trying to accomplish, looks for me something that would
> > fit
> > >> > the spark-submit job, where you can submit pyspark applications
> where
> > >> you
> > >> > can directly fetch the results from Spark:
> > >> >
> > >> > Welcome to
> > >> >       ____              __
> > >> >      / __/__  ___ _____/ /__
> > >> >     _\ \/ _ \/ _ `/ __/  '_/
> > >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> > >> >       /_/
> > >> >
> > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > >> > SparkSession available as 'spark'.
> > >> > >>> spark.sql("SELECT 1 as count").first()
> > >> > Row(count=1)
> > >> >
> > >> > Most of the time we use the Spark-sql to transform the data, then
> use
> > >> sqoop
> > >> > to get the data from hdfs to a rdbms to expose the data to the
> > business.
> > >> > These examples are for Spark using hdfs, but for s3 it is somewhat
> the
> > >> > same.
> > >> >
> > >> > Does this answer your question, if not, could you elaborate the
> > problem
> > >> > that you are facing?
> > >> >
> > >> > Ciao, Fokko
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > 2017-10-13 15:54 GMT+02:00 Boris <boriskey@gmail.com>:
> > >> >
> > >> > > hi guys,
> > >> > >
> > >> > > I opened JIRA on this and will be working on PR
> > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > >> > >
> > >> > > any objections/suggestions conceptually?
> > >> > >
> > >> > > Fokko, I see you have been actively contributing to spark hooks
> and
> > >> > > operators so I could use your opinion before I implement this.
> > >> > >
> > >> > > Boris
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message