flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: FlinkKafkaConsumer and multiple topics
Date Wed, 18 Nov 2015 18:20:37 GMT
The new KafkaConsumer fro Kafka 0.9 should be able to handle this, as the
Kafka Client Code itself has support for this then.

For 0.8.x, we would need to implement support for recovery inside the
consumer ourselves, which is why we decided to initially let the Job
Recovery take care of that.
If that becomes much of an issue, we can look into this again...

On Thu, Sep 24, 2015 at 10:46 PM, Jakob Ericsson <jakob.ericsson@gmail.com>
wrote:

> What I actually meant was partition reassignment (
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
> ).
> No topics were deleted.
> We added a bunch of new servers and needed to reassign some partitions to
> spread the load.
>
> No, I haven't set the setNumberOfExecutionRetries().
>
> On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Hi Jakob,
>>
>> what do you exactly mean by rebalance of topics? Did the leader of the
>> partitions change?
>> Were topics deleted?
>>
>> Flink's KafkaConsumer does not try to recover from these exceptions. We
>> rely on Flink's fault tolerance mechanisms to restart the data consumption
>> (from the last valid offset).
>> Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?
>>
>>
>> On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <jakob.ericsson@gmail.com
>> > wrote:
>>
>>> We did some rebalance of topics in our Kafka cluster today. I had a
>>> flink job running and it crashed when some of the partitions were moved,
>>> other consumers (non flink) continued to work.
>>>
>>> Should I configure it differently or could this be a bug?
>>>
>>> 09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
>>> java.lang.Exception: Error while fetching from broker:
>>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>>>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>         at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>         at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>         at java.lang.Class.newInstance(Class.java:442)
>>>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>>>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>>>         at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Error while fetching from broker:
>>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>>>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>         at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>         at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>         at java.lang.Class.newInstance(Class.java:442)
>>>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>>>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)
>>>
>>>
>>> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <rmetzger@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> did you manually add a Kafka dependency into your project? Maybe you
>>>> are overwriting the Kafka version to a lower version?
>>>>
>>>> I'm sorry that our consumer is crashing when its supposed to read an
>>>> invalid topic .. but In general, thats a good behavior ;)
>>>>
>>>> Maybe you can check whether the topic exists from your user code?
>>>> The getPartitionsForTopic() method is actually a public static method that
>>>> you can call.
>>>> If its throwing an exception, the topic doesn't exist anymore.
>>>>
>>>>
>>>> Robert
>>>>
>>>> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <
>>>> jakob.ericsson@gmail.com> wrote:
>>>>
>>>>> Hit another problem. It is probably related to a topic that still
>>>>> exist in zk but is not used anymore (therefore no partitions) or I want
to
>>>>> start a listener for a topic that hasn't yet been created. I would like
it
>>>>> not to crash.
>>>>>
>>>>> Also, some funny Scala <-> Java
>>>>>
>>>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>>>> kafka.common.ErrorMapping.InvalidTopicCode()S
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)
>>>>>
>>>>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <
>>>>> jakob.ericsson@gmail.com> wrote:
>>>>>
>>>>>> That will work. We have some utility classes for exposing the ZK-info.
>>>>>>
>>>>>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <rmetzger@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Jakob,
>>>>>>>
>>>>>>> currently, its not possible to subscribe to multiple topics with
one
>>>>>>> FlinkKafkaConsumer.
>>>>>>>
>>>>>>> So for now, you have to create a FKC for each topic .. so you'll
end
>>>>>>> up with 50 sources.
>>>>>>>
>>>>>>> As soon as Kafka releases the new consumer, it will support
>>>>>>> subscribing to multiple topics (I think even with pattern support)
and we
>>>>>>> can easily expose the APIs also to the FlinkKafkaConsumer.
>>>>>>> As you can see here:
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>>>>>>> Kafka has plans to release the new consumer API in October.
>>>>>>> As soon as the new API is out, we'll support it.
>>>>>>>
>>>>>>> I hope this solution is okay for you. If not, please let me know
;)
>>>>>>>
>>>>>>>
>>>>>>> Robert
>>>>>>>
>>>>>>> On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <
>>>>>>> jakob.ericsson@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Would it be possible to get the FlinkKafkaConsumer to support
>>>>>>>> multiple topics, like a list?
>>>>>>>>
>>>>>>>> Or would it be better to instantiate one FlinkKafkaConsumers
per
>>>>>>>> topic and add as a source?
>>>>>>>> We have about 40-50 topics to listen for one job.
>>>>>>>> Or even better, supply a regexp pattern that defines the
queues,
>>>>>>>> this means that you have to do some queries against ZK to
get information
>>>>>>>> about topics.
>>>>>>>>
>>>>>>>> Kind regards,
>>>>>>>> Jakob
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message