airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Tyukin <bo...@boristyukin.com>
Subject Re: Return results optionally from spark_sql_hook
Date Mon, 16 Oct 2017 18:01:17 GMT
great, this is what I expected to hear but wanted to double check. thanks
for all your help, Fokko

On Mon, Oct 16, 2017 at 1:08 PM, Driesprong, Fokko <fokko@driesprong.frl>
wrote:

> Hi Boris,
>
> When kicking off Spark jobs using Airflow, cluster mode is highly
> recommended since the workload of the driver is on the Hadoop cluster, and
> not on the Airflow machine itself. Personally I prefer the spark-submit
> operator since it will pull all the connection variables directly from
> Airflow, and you'll end up with a central place (Airflow connections) where
> all the configuration is kept. Otherwise you'll end up with configuration
> within your Airflow logic.
>
> Cheers, Fokko
>
> 2017-10-15 17:16 GMT+02:00 Boris <boriskey@gmail.com>:
>
> > Thanks Fokko. Do you know if it is better to use pyspark directly within
> > python operator or invoke submit-job instead? My understanding in both
> > cases airflow uses yarn-client deployment mode, not yarn-cluster and
> spark
> > driver always runs on the same node with airflow worker. Not sure it is
> the
> > best practice...
> >
> > On Oct 15, 2017 05:04, "Driesprong, Fokko" <fokko@driesprong.frl> wrote:
> >
> > > Hi Boris,
> > >
> > > Instead of writing it to a file, you can also write it to xcom, this
> will
> > > keep everything inside of Airflow. My personal opinion on this;
> spark-sql
> > > is a bit limited by nature, it only support SQL. If you want to do more
> > > dynamic stuff, you will eventually have to move to spark-submit anyway.
> > >
> > > Cheers, Fokko
> > >
> > > 2017-10-14 14:45 GMT+02:00 Boris <boriskey@gmail.com>:
> > >
> > > > Thanks Fokko, I think it will do it but my concern that in this case
> my
> > > dag
> > > > will initiate two separate spark sessions and it takes about 20
> seconds
> > > in
> > > > our yarn environment to create it. I need to run 600 dags like that
> > every
> > > > morning.
> > > >
> > > > I am thinking now to create a pyspark job that will do insert and
> count
> > > and
> > > > write it to a temp file. Still not ideal... I wish I could just parse
> > > spark
> > > > SQL instead..
> > > >
> > > > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fokko@driesprong.frl>
> > > wrote:
> > > >
> > > > > Hi Boris,
> > > > >
> > > > > That sounds like a nice DAG.
> > > > >
> > > > > This is how I would do it: First run the long running query in a
> > > > spark-sql
> > > > > operator like you have now. Create a python function that builds
a
> > > > > SparkSession within Python (using the Spark pyspark api) and
> fetches
> > > the
> > > > > count from the spark partition that you've just created. Create a
> > > > > BranchPythonOperator that will invoke this function, and based on,
> if
> > > the
> > > > > count is ok or not, branch:
> > > > >
> > > > >    - If the count is okay, branch downstream and continue with the
> > > normal
> > > > >    execution.
> > > > >    - If the count is off, terminate and send you and email/slack
> that
> > > the
> > > > >    count is not as expected.
> > > > >
> > > > > ‚ÄčThis will look something like this:
> > > > > [image: Inline afbeelding 1]‚Äč
> > > > >
> > > > > Would this solve your problem?
> > > > >
> > > > > Cheers, Fokko
> > > > >
> > > > >
> > > > >
> > > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <boris@boristyukin.com>:
> > > > >
> > > > >> Hi Fokko, thanks for your response, really appreciate it!
> > > > >>
> > > > >> Basically in my case I have two Spark SQL queries:
> > > > >>
> > > > >> 1) the first query does INSERT OVERWRITE to a partition and may
> > take a
> > > > >> while for a while
> > > > >> 2) then I run a second query right after it to get count of rows
> of
> > > that
> > > > >> partition.
> > > > >> 3) I need to pass that count back to airflow dag and this count
> will
> > > be
> > > > >> used by the next task in the DAG to make a decision if this
> > partition
> > > > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> > > with a
> > > > >> production table partition.
> > > > >>
> > > > >> So I need somehow to get that count of rows. My initial though
was
> > to
> > > > >> parse
> > > > >> the log and extract that count but looks like even if i do regex
> it
> > > does
> > > > >> not quite work - spark sql writes query output to stdout which
> > airflow
> > > > >> spark sql hook does not capture right now.
> > > > >>
> > > > >> if you can suggest a better solution for me it would be great!
> > > > >>
> > > > >> Also initially I wanted to count rows and then do ALTER TABLE
> > EXCHANGE
> > > > >> PARTITION in the same pyspark job but I found out that spark
does
> > not
> > > > >> support this statement yet and I have to use Hive.
> > > > >>
> > > > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko
> > > <fokko@driesprong.frl
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Boris,
> > > > >> >
> > > > >> > Thank you for your question and excuse me for the late response,
> > > > >> currently
> > > > >> > I'm on holiday.
> > > > >> >
> > > > >> > The solution that you suggest, would not be my preferred
choice.
> > > > >> Extracting
> > > > >> > results from a log using a regex is expensive in terms of
> > > > computational
> > > > >> > costs, and error prone. My question is, what are you trying
to
> > > > >> accomplish?
> > > > >> > For me there are two ways of using the Spark-sql operator:
> > > > >> >
> > > > >> >    1. ETL Using Spark: Instead of returning the results,
write
> the
> > > > >> results
> > > > >> >    back to a new table, or a new partition within the table.
> This
> > > data
> > > > >> can
> > > > >> > be
> > > > >> >    used downstream in the dag. Also, this will write the
data to
> > > hdfs
> > > > >> > which is
> > > > >> >    nice for persistance.
> > > > >> >    2. Write the data in a simple and widely supported format
> (such
> > > as
> > > > >> csv)
> > > > >> >    onto hdfs. Now you can get the data from hdfs using `hdfs
dfs
> > > -get`
> > > > >> to
> > > > >> > you
> > > > >> >    local file-system. Or use `hdfs dfs -cat ... |
> application.py`
> > to
> > > > >> pipe
> > > > >> > it
> > > > >> >    to your application directly.
> > > > >> >
> > > > >> > What you are trying to accomplish, looks for me something
that
> > would
> > > > fit
> > > > >> > the spark-submit job, where you can submit pyspark applications
> > > where
> > > > >> you
> > > > >> > can directly fetch the results from Spark:
> > > > >> >
> > > > >> > Welcome to
> > > > >> >       ____              __
> > > > >> >      / __/__  ___ _____/ /__
> > > > >> >     _\ \/ _ \/ _ `/ __/  '_/
> > > > >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> > > > >> >       /_/
> > > > >> >
> > > > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > > > >> > SparkSession available as 'spark'.
> > > > >> > >>> spark.sql("SELECT 1 as count").first()
> > > > >> > Row(count=1)
> > > > >> >
> > > > >> > Most of the time we use the Spark-sql to transform the data,
> then
> > > use
> > > > >> sqoop
> > > > >> > to get the data from hdfs to a rdbms to expose the data
to the
> > > > business.
> > > > >> > These examples are for Spark using hdfs, but for s3 it is
> somewhat
> > > the
> > > > >> > same.
> > > > >> >
> > > > >> > Does this answer your question, if not, could you elaborate
the
> > > > problem
> > > > >> > that you are facing?
> > > > >> >
> > > > >> > Ciao, Fokko
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > 2017-10-13 15:54 GMT+02:00 Boris <boriskey@gmail.com>:
> > > > >> >
> > > > >> > > hi guys,
> > > > >> > >
> > > > >> > > I opened JIRA on this and will be working on PR
> > > > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > > > >> > >
> > > > >> > > any objections/suggestions conceptually?
> > > > >> > >
> > > > >> > > Fokko, I see you have been actively contributing to
spark
> hooks
> > > and
> > > > >> > > operators so I could use your opinion before I implement
this.
> > > > >> > >
> > > > >> > > Boris
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message