spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tathagata Das (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-24050) StreamingQuery does not calculate input / processing rates in some cases
Date Wed, 25 Apr 2018 19:23:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Tathagata Das resolved SPARK-24050.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 21126
[https://github.com/apache/spark/pull/21126]

> StreamingQuery does not calculate input / processing rates in some cases
> ------------------------------------------------------------------------
>
>                 Key: SPARK-24050
>                 URL: https://issues.apache.org/jira/browse/SPARK-24050
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>            Priority: Major
>             Fix For: 3.0.0
>
>
> In some streaming queries, the input and processing rates are not calculated at all (shows
up as zero) because MicroBatchExecution fails to associated metrics from the executed plan
of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source
attribution works is as follows. With V1 sources, there was no way to identify which execution
plan leaves were generated by a streaming source. So did a best-effort attempt to match logical
and execution plan leaves when the number of leaves were same. In cases where the number of
leaves is different, we just give up and report zero rates. An example where this may happen
is as follows.
> {code}
> val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
> val streamingInputDF = ...
> val query = streamingInputDF.join(cachedStaticDF).writeStream....
> {code}
> In this case, the {{cachedStaticDF}} has multiple logical leaves, but in the trigger's
execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec
leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed
as zero. 
> With DataSourceV2, all inputs are represented in the executed plan using {{DataSourceV2ScanExec}},
each of which has a reference to the associated logical {{DataSource}} and {{DataSourceReader}}.
So its easy to associate the metrics to the original streaming sources. So the solution is
to take advantage of the presence of DataSourceV2 whenever possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message