flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Robert Metzger (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source
Date Tue, 22 Sep 2015 10:04:04 GMT

     [ https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Robert Metzger resolved FLINK-2325.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9.1
                   0.10

I just verified the behavior of the new {{FlinkKafkaSource}} for consuming from topics which
don't exist.
It works. The log looks like this:

{code}
12:01:02,233 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying
to get topic metadata from broker localhost:9092 in try 0/3
12:01:02,566 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Error
while getting metadata from broker localhost:9092 to find partitions for thatDoesNotExist151
kafka.common.LeaderNotAvailableException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at java.lang.Class.newInstance(Class.java:442)
	at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
	at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:625)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.<init>(FlinkKafkaConsumer082.java:49)
	at com.dataartisans.KafkaStringReader.main(KafkaStringReader.java:38)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
12:01:02,569 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying
to get topic metadata from broker localhost:9092 in try 1/3
12:01:02,574 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic
thatDoesNotExist151 has 1 partitions
12:01:02,680 INFO  org.apache.flink.streaming.util.ClusterUtil                   - Running
on mini cluster
{code}

There is a retry logic for getting the topic metadata. The first call to get the metadata
fails, because the topic does not yet exist. In the second call, we see that Kafka has created
the topic with one partition.
The consumer is then able to consume the messages from the topic.

> PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that
is created after starting the Source
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2325
>                 URL: https://issues.apache.org/jira/browse/FLINK-2325
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 0.9
>            Reporter: Rico Bergmann
>            Assignee: Robert Metzger
>             Fix For: 0.10, 0.9.1
>
>
> I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, that
is at the time the PersistentKafkaSource is started (via open(.)) not yet present. That's
why the number of partitions, that is read in the open(.) function is 0, which leads to arrays
of length 0 (lastOffsets and committedOffsets).
> May be it is better to check, whether numberOfPartitions returns 0 and if so, to take
the default number of partitions from Kafka config?
> Stacktrace:
> java.lang.ArrayIndexOutOfBoundsException: 0
> 	at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message