spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ofir Manor (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-17812) More granular control of starting offsets
Date Sun, 09 Oct 2016 22:00:22 GMT

    [ https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15560696#comment-15560696
] 

Ofir Manor edited comment on SPARK-17812 at 10/9/16 10:00 PM:
--------------------------------------------------------------

Cody, you are absolutely right that the option naming is silly and leads to a dead end. Maybe
it could be fixed now, as this code haven't yet been released.
In general, I see just four useful options for a starting position:
1. Give me all messages - read all messages in the topics.
2. Ignore all current messages - read only new messages from now on.
3. Give me  all messages starting from timestamp t - that could be a filter on (1), or in
Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka topic can also have either
broker-generated timestamps, or user-provided event timestamps).
4. Give me all messages from a custom offset - for "advanced" cases

I can suggest something specific (though it will be a matter of taste, stringy or not), but
generally, I think there should be a single way to specify where to start, and it should cover
these four alternatives. Having a bunch of mutally-exclusive options seems confusing, and
giving them the wrong names is even more so.

Regarding "last x offsets" - I don't really get it. It seems to assume that Kafka has a single
offset space, which is quite alien to Kafka (a topic is a collection of independent, ordered
partitions).
For example, a simple topic with four partitions. What is 1000 offsets back?
1. Last 1000 messages per partition? (4000 in total)
2. Last 250 messages per partition? (definitely NOT the last 1000 messages)
3. Read last 1000 messages per partition, then merge and keep the last 1000 messages by timestamp?
(provide a somewhat meaningful semantics, but is still a bit nonsense)
For me, neither of them makes sense, because the user actually says - I want random stuff
and I don't care what it is... It is as if, for a database source, we would start with random
1000 rows, followed by careful work to capture every change without missing any.
I believe "last hour" would make a lot more sense, and if someone really wants some variation
of this "last 1000 messages", he could just create a custom offset.
(UPDATE) BTW Cody, I now get why you insistent on consuming from Kafka based on timestamp,
since May(!). It is the only option that isn't "start at a random point", but "start at a
well-defined logical point"


was (Author: ofirm):
Cody, you are absolutely right that the option naming is silly and leads to a dead end. Maybe
it could be fixed now, as this code haven't yet been released.
In general, I see just four useful options for a starting position:
1. Give me all messages - read all messages in the topics.
2. Ignore all current messages - read only new messages from now on.
3. Give me  all messages starting from timestamp t - that could be a filter on (1), or in
Kafka 0.10.1+, pushed down to Kafka (in that version, the Kafka topic can also have either
broker-generated timestamps, or user-provided event timestamps).
4. Give me all messages from a custom offset - for "advanced" cases

I can suggest something specific (though it will be a matter of taste, stringy or not), but
generally, I think there should be a single way to specify where to start, and it should cover
these four alternatives. Having a bunch of mutally-exclusive options seems confusing, and
giving them the wrong names is even more so.

Regarding "last x offsets" - I don't really get it. It seems to assume that Kafka has a single
offset space, which is quite alien to Kafka (a topic is a collection of independent, ordered
partitions).
For example, a simple topic with four partitions. What is 1000 offsets back?
1. Last 1000 messages per partition? (4000 in total)
2. Last 250 messages per partition? (definitely NOT the last 1000 messages)
3. Read last 1000 messages per partition, then merge and keep the last 1000 messages by timestamp?
(provide a somewhat meaningful semantics, but is still a bit nonsense)
For me, neither of them makes sense, because the user actually says - I want random stuff
and I don't care what it is... It is as if, for a database source, we would start with random
1000 rows, followed by careful work to capture every change without missing any.
I believe "last hour" would make a lot more sense, and if someone really wants some variation
of this "last 1000 messages", he could just create a custom offset.

> More granular control of starting offsets
> -----------------------------------------
>
>                 Key: SPARK-17812
>                 URL: https://issues.apache.org/jira/browse/SPARK-17812
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the earliest or latests
offsets available at the moment the query is started.  Sometimes this is a lot of data.  It
would be nice to be able to do the following:
>  - seek back {{X}} offsets in the stream from the moment the query starts
>  - seek to user specified offsets



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message