spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alex Vayda (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24647) Sink Should Return OffsetSeqs For ProgressReporting
Date Mon, 25 Jun 2018 12:57:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Alex Vayda updated SPARK-24647:
-------------------------------
    Affects Version/s:     (was: 2.4.0)
                       2.3.1

> Sink Should Return OffsetSeqs For ProgressReporting
> ---------------------------------------------------
>
>                 Key: SPARK-24647
>                 URL: https://issues.apache.org/jira/browse/SPARK-24647
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vaclav Kosar
>            Priority: Major
>             Fix For: 2.4.0
>
>
> To be able to track data lineage for Structured Streaming (I intend to implement this
to Open Source Project Spline), the monitoring needs to be able to not only to track where
the data was read from but also where results were written to. This could be to my knowledge
best implemented using monitoring {{StreamingQueryProgress}}. However currently batch data
offsets are not available on {{Sink}} interface. Implementing as proposed would also bring
symmetry to {{StreamingQueryProgress}} fields sources and sink.
>  
> *Similar Proposals*
> Made in following jiras. These would not be sufficient for lineage tracking.
>  * https://issues.apache.org/jira/browse/SPARK-18258
>  * https://issues.apache.org/jira/browse/SPARK-21313
>  
> *Current State*
>  * Method {{Sink#addBatch}} returns {{Unit}}.
>  * {{StreamingQueryProgress}} reports {{offsetSeq}} start and end using {{sourceProgress}}
value but {{sinkProgress}} only calls {{toString}} method.
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f"
>   }
> {code}
>  
>  
> *Proposed State*
>  * {{Sink#addBatch}} to return {{OffsetSeq}} or {{StreamProgress}} specifying offsets
of the written batch, e.g. Kafka does it by returning {{RecordMetadata}} object from {{send}}
method.
>  * {{StreamingQueryProgress}} incorporate {{sinkProgress}} in similar fashion as {{sourceProgress}}.
>  
>  
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f",
>    "startOffset" : null,
>     "endOffset" { "sinkTopic": { "0": 333 }}
>   }
> {code}
>  
> *Implementation*
> * PR submitters: Likely will be me and [~wajda] as soon as the discussion ends positively.

>  * {{Sinks}}: Modify all sinks to conform a new interface or return dummy values.
>  * {{ProgressReporter}}: Merge offsets from different batches properly, similarly to
how it is done for sources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message