flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: FlinkKafkaConsumer and multiple topics
Date Thu, 24 Sep 2015 20:06:27 GMT
Hi Jakob,

what do you exactly mean by rebalance of topics? Did the leader of the
partitions change?
Were topics deleted?

Flink's KafkaConsumer does not try to recover from these exceptions. We
rely on Flink's fault tolerance mechanisms to restart the data consumption
(from the last valid offset).
Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?


On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <jakob.ericsson@gmail.com>
wrote:

> We did some rebalance of topics in our Kafka cluster today. I had a flink
> job running and it crashed when some of the partitions were moved, other
> consumers (non flink) continued to work.
>
> Should I configure it differently or could this be a bug?
>
> 09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
> java.lang.Exception: Error while fetching from broker:
> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>         at java.lang.Class.newInstance(Class.java:442)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error while fetching from broker:
> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>         at java.lang.Class.newInstance(Class.java:442)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)
>
>
> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Hi,
>>
>> did you manually add a Kafka dependency into your project? Maybe you are
>> overwriting the Kafka version to a lower version?
>>
>> I'm sorry that our consumer is crashing when its supposed to read an
>> invalid topic .. but In general, thats a good behavior ;)
>>
>> Maybe you can check whether the topic exists from your user code?
>> The getPartitionsForTopic() method is actually a public static method that
>> you can call.
>> If its throwing an exception, the topic doesn't exist anymore.
>>
>>
>> Robert
>>
>> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <jakob.ericsson@gmail.com
>> > wrote:
>>
>>> Hit another problem. It is probably related to a topic that still exist
>>> in zk but is not used anymore (therefore no partitions) or I want to start
>>> a listener for a topic that hasn't yet been created. I would like it not to
>>> crash.
>>>
>>> Also, some funny Scala <-> Java
>>>
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> kafka.common.ErrorMapping.InvalidTopicCode()S
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)
>>>
>>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <
>>> jakob.ericsson@gmail.com> wrote:
>>>
>>>> That will work. We have some utility classes for exposing the ZK-info.
>>>>
>>>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Jakob,
>>>>>
>>>>> currently, its not possible to subscribe to multiple topics with one
>>>>> FlinkKafkaConsumer.
>>>>>
>>>>> So for now, you have to create a FKC for each topic .. so you'll end
>>>>> up with 50 sources.
>>>>>
>>>>> As soon as Kafka releases the new consumer, it will support
>>>>> subscribing to multiple topics (I think even with pattern support) and
we
>>>>> can easily expose the APIs also to the FlinkKafkaConsumer.
>>>>> As you can see here:
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>>>>> Kafka has plans to release the new consumer API in October.
>>>>> As soon as the new API is out, we'll support it.
>>>>>
>>>>> I hope this solution is okay for you. If not, please let me know ;)
>>>>>
>>>>>
>>>>> Robert
>>>>>
>>>>> On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <
>>>>> jakob.ericsson@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Would it be possible to get the FlinkKafkaConsumer to support
>>>>>> multiple topics, like a list?
>>>>>>
>>>>>> Or would it be better to instantiate one FlinkKafkaConsumers per
>>>>>> topic and add as a source?
>>>>>> We have about 40-50 topics to listen for one job.
>>>>>> Or even better, supply a regexp pattern that defines the queues,
this
>>>>>> means that you have to do some queries against ZK to get information
about
>>>>>> topics.
>>>>>>
>>>>>> Kind regards,
>>>>>> Jakob
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message