spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lalwani, Jayesh" <Jayesh.Lalw...@capitalone.com>
Subject Re: Union of 2 streaming data frames
Date Mon, 10 Jul 2017 17:55:46 GMT
Michael,

I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is going to be out soon?
Do you have some sort of ETA?

From: "Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com>
Date: Friday, July 7, 2017 at 5:46 PM
To: Michael Armbrust <michael@databricks.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <MM-Heartbeat@capitalone.com>
Subject: Re: Union of 2 streaming data frames

Great! Even, val dfAllEvents = sparkSession.table("oldEvents").union(sparkSession.table("newEvents"))
doesn’t work. Will this be addressed in 2.2?


From: Michael Armbrust <michael@databricks.com>
Date: Friday, July 7, 2017 at 5:42 PM
To: "Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <MM-Heartbeat@capitalone.com>
Subject: Re: Union of 2 streaming data frames

Ah, looks like you are hitting SPARK-20441<https://issues.apache.org/jira/browse/SPARK-20441>.
 Should be fixed in 2.2.

On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh <Jayesh.Lalwani@capitalone.com<mailto:Jayesh.Lalwani@capitalone.com>>
wrote:
I created a small sample code to verify this. It looks like union using Spark SQL doesn’t
work. Calling union on dataframe works. https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545.
I’m on 2.1.0

I get the following exception. If I change val dfAllEvents = sparkSession.sql("select * from
oldEvents union select * from newEvents") to val dfAllEvents = dfNewEvents.union(dfOldEvents)
it works fine

17/07/07 17:33:34 ERROR StreamExecution: Query [id = 3bae26a1-7ee3-45ab-a98d-9346eaf03d08,
runId = 063af01f-9878-452e-aa30-7c21e2ef4c18] terminated with error
org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29 missing from eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L
in operator !Join Inner, (acctId#0 = acctId#29);;
Distinct
+- Union
   :- Project [acctId#0, eventId#1L, eventType#2]
   :  +- SubqueryAlias oldevents, `oldEvents`
   :     +- Project [acctId#0, eventId#1L, eventType#2]
  :        +- !Join Inner, (acctId#0 = acctId#29)
   :           :- SubqueryAlias alloldevents, `allOldEvents`
   :           :  +- Relation[acctId#0,eventId#1L,eventType#2] json
   :           +- SubqueryAlias newevents, `newEvents`
   :              +- Relation[acctId#36,eventId#37L,eventType#38] json
   +- Project [acctId#29, eventId#30L, eventType#31]
      +- SubqueryAlias newevents, `newEvents`
         +- Relation[acctId#29,eventId#30L,eventType#31] json

                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
                at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
                at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
                at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
                at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
                at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:68)
                at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
                at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:60)
                at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60)
                at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
                at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
                at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
                at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488)
                at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
                at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
                at org.apache.spark.sql.execution.streaming.StreamExecution.org<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:488)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
                at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
                at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
                at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
                at org.apache.spark.sql.execution.streaming.StreamExecution.org<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)




From: Michael Armbrust <michael@databricks.com<mailto:michael@databricks.com>>
Date: Friday, July 7, 2017 at 2:30 PM
To: "Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com<mailto:Jayesh.Lalwani@capitalone.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Union of 2 streaming data frames

df.union(df2) should be supported when both DataFrames are created from a streaming source.
 What error are you seeing?

On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <Jayesh.Lalwani@capitalone.com<mailto:Jayesh.Lalwani@capitalone.com>>
wrote:
In structured streaming, Is there a way to Union 2 streaming data frames? Are there any plans
to support Union of 2 streaming dataframes soon? I can understand the inherent complexity
in joining 2 streaming data frames. But, Union is  just concatenating 2 microbatches, innit?

The problem that we are trying to solve is that we have a Kafka stream that is receiving events.
Each event is assosciated with an account ID. We have a data store that stores historical
 events for hundreds of millions of accounts. What we want to do is for the events coming
in the input stream, we want to add in all the historical events from the data store and give
it to a model.

Initially, the way we were planning to do this is
a) read from Kafka into a streaming dataframe. Call this inputDF.
b) In a mapWithPartition method, get all the unique accounts in the partition. Look up all
the historical events for those unique accounts and return them. Let’s call this historicalDF
c) Union inputDF with historicalDF. Call this allDF
d) Call mapWithPartition on allDF and give the records to the model

Of course, this doesn’t work because both inputDF and historicalDF are streaming data frames.

What we ended up doing is in step b) we output the input records with the historical records,
which works but seems like a hacky way of doing things. The operation that does lookup does
union too. This works for now because the data from the data store doesn’t require any transformation
or aggregation. But, if it did, we would like to do that using Spark SQL, whereas this solution
forces us to doing any transformation of historical data in Scala

Is there a Sparky way of doing this?

________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates and may only be used solely in performance of work or services for Capital
One. The information transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination, distribution, copying
or other use of, or taking of any action in reliance upon this information is strictly prohibited.
If you have received this communication in error, please contact the sender and delete the
material from your computer.


________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates and may only be used solely in performance of work or services for Capital
One. The information transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination, distribution, copying
or other use of, or taking of any action in reliance upon this information is strictly prohibited.
If you have received this communication in error, please contact the sender and delete the
material from your computer.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates and may only be used solely in performance of work or services for Capital
One. The information transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination, distribution, copying
or other use of, or taking of any action in reliance upon this information is strictly prohibited.
If you have received this communication in error, please contact the sender and delete the
material from your computer.
Mime
View raw message