airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Tyukin <>
Subject Re: Return results optionally from spark_sql_hook
Date Sat, 14 Oct 2017 11:42:41 GMT
Hi Fokko, thanks for your response, really appreciate it!

Basically in my case I have two Spark SQL queries:

1) the first query does INSERT OVERWRITE to a partition and may take a
while for a while
2) then I run a second query right after it to get count of rows of that
3) I need to pass that count back to airflow dag and this count will be
used by the next task in the DAG to make a decision if this partition
should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
production table partition.

So I need somehow to get that count of rows. My initial though was to parse
the log and extract that count but looks like even if i do regex it does
not quite work - spark sql writes query output to stdout which airflow
spark sql hook does not capture right now.

if you can suggest a better solution for me it would be great!

Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
PARTITION in the same pyspark job but I found out that spark does not
support this statement yet and I have to use Hive.

On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <>

> Hi Boris,
> Thank you for your question and excuse me for the late response, currently
> I'm on holiday.
> The solution that you suggest, would not be my preferred choice. Extracting
> results from a log using a regex is expensive in terms of computational
> costs, and error prone. My question is, what are you trying to accomplish?
> For me there are two ways of using the Spark-sql operator:
>    1. ETL Using Spark: Instead of returning the results, write the results
>    back to a new table, or a new partition within the table. This data can
> be
>    used downstream in the dag. Also, this will write the data to hdfs
> which is
>    nice for persistance.
>    2. Write the data in a simple and widely supported format (such as csv)
>    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to
> you
>    local file-system. Or use `hdfs dfs -cat ... |` to pipe
> it
>    to your application directly.
> What you are trying to accomplish, looks for me something that would fit
> the spark-submit job, where you can submit pyspark applications where you
> can directly fetch the results from Spark:
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
>       /_/
> Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> SparkSession available as 'spark'.
> >>> spark.sql("SELECT 1 as count").first()
> Row(count=1)
> Most of the time we use the Spark-sql to transform the data, then use sqoop
> to get the data from hdfs to a rdbms to expose the data to the business.
> These examples are for Spark using hdfs, but for s3 it is somewhat the
> same.
> Does this answer your question, if not, could you elaborate the problem
> that you are facing?
> Ciao, Fokko
> 2017-10-13 15:54 GMT+02:00 Boris <>:
> > hi guys,
> >
> > I opened JIRA on this and will be working on PR
> >
> >
> > any objections/suggestions conceptually?
> >
> > Fokko, I see you have been actively contributing to spark hooks and
> > operators so I could use your opinion before I implement this.
> >
> > Boris
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message