spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fred Reiss <>
Subject Re: Source API requires unbounded distributed storage?
Date Tue, 09 Aug 2016 02:24:07 GMT
Created SPARK-16963 to cover this issue.


On Thu, Aug 4, 2016 at 4:52 PM, Michael Armbrust <>

> Yeah, this API is in the private execution package because we are planning
> to continue to iterate on it.  Today, we will only ever go back one batch,
> though that might change in the future if we do async checkpointing of
> internal state.
> You are totally right that we should relay this info back to the source.
> Opening a JIRA sounds like a good first step.
> On Thu, Aug 4, 2016 at 4:38 PM, Fred Reiss <> wrote:
>> Hi,
>> I've been looking over the Source API in org.apache.spark.sql.execution.streaming,
>> and I'm at a loss for how the current API can be implemented in a practical
>> way. The API defines a single getBatch() method for fetching records from
>> the source, with the following Scaladoc comments defining the semantics:
>> */**  * Returns the data that is between the offsets (*`*start*`*, *`
>> *end*`*]. When *`*start*` *is *`*None*`
>> *then  * the batch should begin with the first available record. This
>> method must always return the  * same data for a particular *`*start*` *and
>> *`*end*`
>> *pair.  */*
>> * def *getBatch(start: Option[Offset], end: Offset): DataFrame
>> If I read the semantics described here correctly, a Source is required to
>> retain all past history for the stream that it backs. Further, a Source
>> is also required to retain this data across restarts of the process where
>> the Source is instantiated, even when the Source is restarted on a
>> different machine.
>> The current implementation of FileStreamSource follows my reading of the
>> requirements above. FileStreamSource never deletes a file.
>> I feel like this requirement for unbounded state retention must be a
>> mistake or misunderstanding of some kind. The scheduler is internally
>> maintaining a high water mark (StreamExecution.committedOffsets in
>> StreamExecution.scala) of data that has been successfully processed. There
>> must have been an intent to communicate that high water mark back to the
>> Source so that the Source can clean up its state. Indeed, the DataBricks
>> blog post from last week (
>> 016/07/28/structured-streaming-in-apache-spark.html) says that "Only a
>> few minutes’ worth of data needs to be retained; Structured Streaming will
>> maintain its own internal state after that."
>> But the code checked into git and shipped with Spark 2.0 does not have an
>> API call for the scheduler to tell a Source where the boundary of "only a
>> few minutes' worth of data" lies.
>> Is there a JIRA that I'm not aware of to change the Source API? If not,
>> should we maybe open one?
>> Fred

View raw message