flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Empty state restore seems to be broken for Kafka source (1.3.2)
Date Wed, 06 Sep 2017 12:45:50 GMT
Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery
of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources
get all partition on restore and if they didn't get any they know that they are new. There
is no specific logic for detecting this situation, it's just that the partition discoverer
will be seeded with this information and it will know if it discovers a new partition whether
it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

> On 6. Sep 2017, at 14:36, Gyula Fóra <gyfora@apache.org> wrote:
> 
> Wouldnt it be enough that Kafka sources store some empty container for there state if
it is empty, compared to null when it should be bootstrapped again?
> 
> Gyula
> 
> Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>> ezt írta
(időpont: 2017. szept. 6., Sze, 14:31):
> The problem here is that context.isRestored() is a global flag and not local to each
operator. It says "yes this job was restored" but the source would need to know that it is
actually brand new and never had any state. This is quite tricky to do, since there is currently
no way (if I'm correct) to differentiate between "I got empty state but others maybe got state"
and "this source never had state and neither had other parallel instances".
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 13:56, Stefan Richter <s.richter@data-artisans.com <mailto:s.richter@data-artisans.com>>
wrote:
>> 
>> Thanks for the report, I will take a look.
>> 
>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyfora@apache.org <mailto:gyfora@apache.org>>:
>>> 
>>> Hi all,
>>> 
>>> We are running into some problems with the kafka source after changing the uid
and restoring from the savepoint.
>>> What we are expecting is to clear the partition state, and set it up all over
again, but what seems to happen is that the consumer thinks that it doesnt have any partitions
assigned.
>>> 
>>> This was supposed to be fixed in https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
<https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475>
>>> but appears to be reworked/reverted in the latest release : https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
<https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547>
>>> 
>>> What is the expected behaviour here?
>>> 
>>> Thanks!
>>> Gyula
>> 
> 


Mime
View raw message