spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: Union of 2 streaming data frames
Date Mon, 10 Jul 2017 18:18:24 GMT
As I said in the voting thread:

This vote passes! I'll followup with the release on Monday.



On Mon, Jul 10, 2017 at 10:55 AM, Lalwani, Jayesh <
Jayesh.Lalwani@capitalone.com> wrote:

> Michael,
>
>
>
> I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is
> going to be out soon? Do you have some sort of ETA?
>
>
>
> *From: *"Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com>
> *Date: *Friday, July 7, 2017 at 5:46 PM
> *To: *Michael Armbrust <michael@databricks.com>
>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <
> MM-Heartbeat@capitalone.com>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> Great! Even, *val **dfAllEvents =
> sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) *doesn’t
> work. Will this be addressed in 2.2?
>
>
>
>
>
> *From: *Michael Armbrust <michael@databricks.com>
> *Date: *Friday, July 7, 2017 at 5:42 PM
> *To: *"Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <
> MM-Heartbeat@capitalone.com>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> Ah, looks like you are hitting SPARK-20441
> <https://issues.apache.org/jira/browse/SPARK-20441>.  Should be fixed in
> 2.2.
>
>
>
> On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh <
> Jayesh.Lalwani@capitalone.com> wrote:
>
> I created a small sample code to verify this. It looks like union using
> Spark SQL doesn’t work. Calling union on dataframe works.
> https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m
> on 2.1.0
>
>
>
> I get the following exception. If I change val dfAllEvents =
> sparkSession.sql("select * from oldEvents union select * from newEvents")
> to val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine
>
>
>
> 17/07/07 17:33:34 ERROR StreamExecution: Query [id =
> 3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = 063af01f-9878-452e-aa30-7c21e2ef4c18]
> terminated with error
>
> org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29
> missing from eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L
> in operator !Join Inner, (acctId#0 = acctId#29);;
>
> Distinct
>
> +- Union
>
>    :- Project [acctId#0, eventId#1L, eventType#2]
>
>    :  +- SubqueryAlias oldevents, `oldEvents`
>
>    :     +- Project [acctId#0, eventId#1L, eventType#2]
>
>   :        +- !Join Inner, (acctId#0 = acctId#29)
>
>    :           :- SubqueryAlias alloldevents, `allOldEvents`
>
>    :           :  +- Relation[acctId#0,eventId#1L,eventType#2] json
>
>    :           +- SubqueryAlias newevents, `newEvents`
>
>    :              +- Relation[acctId#36,eventId#37L,eventType#38] json
>
>    +- Project [acctId#29, eventId#30L, eventType#31]
>
>       +- SubqueryAlias newevents, `newEvents`
>
>          +- Relation[acctId#29,eventId#30L,eventType#31] json
>
>
>
>                 at org.apache.spark.sql.catalyst.
> analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>
>                 at org.apache.spark.sql.catalyst.analysis.Analyzer.
> failAnalysis(Analyzer.scala:57)
>
>                 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
>
>                 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:128)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at scala.collection.immutable.List.foreach(List.scala:381)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at scala.collection.immutable.List.foreach(List.scala:381)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at scala.collection.immutable.List.foreach(List.scala:381)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at scala.collection.immutable.List.foreach(List.scala:381)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
>                 at scala.collection.immutable.List.foreach(List.scala:381)
>
>                 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
>                 at org.apache.spark.sql.catalyst.
> analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
>
>                 at org.apache.spark.sql.catalyst.analysis.Analyzer.
> checkAnalysis(Analyzer.scala:57)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:48)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> withCachedData$lzycompute(QueryExecution.scala:68)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> withCachedData(QueryExecution.scala:67)
>
>                 at org.apache.spark.sql.execution.streaming.
> IncrementalExecution.optimizedPlan$lzycompute(
> IncrementalExecution.scala:60)
>
>                 at org.apache.spark.sql.execution.streaming.
> IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan$lzycompute(QueryExecution.scala:79)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan(QueryExecution.scala:75)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> executedPlan$lzycompute(QueryExecution.scala:84)
>
>                 at org.apache.spark.sql.execution.QueryExecution.
> executedPlan(QueryExecution.scala:84)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488)
>
>                 at org.apache.spark.sql.execution.streaming.
> ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution.org$apache$spark$sql$execution$streaming$
> StreamExecution$$runBatch(StreamExecution.scala:488)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$
> mcV$sp(StreamExecution.scala:255)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(
> StreamExecution.scala:244)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(
> StreamExecution.scala:244)
>
>                 at org.apache.spark.sql.execution.streaming.
> ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(
> StreamExecution.scala:244)
>
>                 at org.apache.spark.sql.execution.streaming.
> ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution.org$apache$spark$sql$execution$streaming$
> StreamExecution$$runBatches(StreamExecution.scala:239)
>
>                 at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anon$1.run(StreamExecution.scala:177)
>
>
>
>
>
>
>
>
>
> *From: *Michael Armbrust <michael@databricks.com>
> *Date: *Friday, July 7, 2017 at 2:30 PM
> *To: *"Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> df.union(df2) should be supported when both DataFrames are created from a
> streaming source.  What error are you seeing?
>
>
>
> On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <
> Jayesh.Lalwani@capitalone.com> wrote:
>
> In structured streaming, Is there a way to Union 2 streaming data frames?
> Are there any plans to support Union of 2 streaming dataframes soon? I can
> understand the inherent complexity in joining 2 streaming data frames. But,
> Union is  just concatenating 2 microbatches, innit?
>
>
>
> The problem that we are trying to solve is that we have a Kafka stream
> that is receiving events. Each event is assosciated with an account ID. We
> have a data store that stores historical  events for hundreds of millions
> of accounts. What we want to do is for the events coming in the input
> stream, we want to add in all the historical events from the data store and
> give it to a model.
>
>
>
> Initially, the way we were planning to do this is
> a) read from Kafka into a streaming dataframe. Call this inputDF.
> b) In a mapWithPartition method, get all the unique accounts in the
> partition. Look up all the historical events for those unique accounts and
> return them. Let’s call this historicalDF
>
> c) Union inputDF with historicalDF. Call this allDF
>
> d) Call mapWithPartition on allDF and give the records to the model
>
>
>
> Of course, this doesn’t work because both inputDF and historicalDF are
> streaming data frames.
>
>
>
> What we ended up doing is in step b) we output the input records with the
> historical records, which works but seems like a hacky way of doing things.
> The operation that does lookup does union too. This works for now because
> the data from the data store doesn’t require any transformation or
> aggregation. But, if it did, we would like to do that using Spark SQL,
> whereas this solution forces us to doing any transformation of historical
> data in Scala
>
>
>
> Is there a Sparky way of doing this?
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Mime
View raw message