flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8896) Kafka08Fetcher trying to look up topic "n/a" on partiton "-1"
Date Thu, 08 Mar 2018 11:33:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391111#comment-16391111

ASF GitHub Bot commented on FLINK-8896:

GitHub user NicoK opened a pull request:


     [FLINK-8896][kafka08] fix Kafka08Fetcher trying to look up topic "n/a" on partiton "-1"

    ## What is the purpose of the change
    `Kafka08Fetcher` uses a `MARKER` to make sure the main thread wakes up when cancelling.
While looking up partition leaders, this marker is removed only once from the list of partitions
to look up and there is a code path that leads to two markers being present in which case
the lookup will throw an exception:
    - `FlinkKafkaConsumerBase#cancel()` is called in one thread, stopped right after setting
`running` to false, and then
    -  `FlinkKafkaConsumerBase`'s partition discovery loop thread drops out before the first
thread was able to call `Kafka08Fetcher#cancel`.
    ## Brief change log
    - remove all markers in the list, not just one
    - make `FlinkKafkaConsumerBase`'s partition discovery loop not call `cancel()` if already
    ## Verifying this change
    This change is a trivial rework / code cleanup without any test coverage.
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: **no**
      - The S3 file system connector: **no**
    ## Documentation
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **JavaDocs**

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-8896

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5661
commit e9711a61be0c260629c9f2f34c03af6e1aa9ac61
Author: Nico Kruber <nico@...>
Date:   2018-03-08T11:22:32Z

    [FLINK-8896][kafka08] remove all cancel MARKERs before trying to find partition leaders
    This guards us against #cancel() being called multiple times and then trying to
    look up an invalid topic/partition pair.

commit bffe96f5e3522824656ed55074ac09591a36e2ae
Author: Nico Kruber <nico@...>
Date:   2018-03-08T11:23:47Z

    [hotfix][kafka] do not run cancel() in the discovery loop if already cancelled


> Kafka08Fetcher trying to look up topic "n/a" on partiton "-1"
> -------------------------------------------------------------
>                 Key: FLINK-8896
>                 URL: https://issues.apache.org/jira/browse/FLINK-8896
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.0, 1.3.1, 1.4.0, 1.3.2, 1.5.0, 1.4.1, 1.6.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Critical
>             Fix For: 1.5.0, 1.6.0
> A user on the [mailing list|https://lists.apache.org/thread.html/fa96b09fc1d3a7efdb1bf7946489edafed8cdf138e933e9d0d8948a1@%3Cuser.flink.apache.org%3E]
reported this error:
> {code}
> java.lang.RuntimeException: Unable to find a leader for partitions: [Partition: KafkaTopicPartition{topic='n/a',
partition=-1}, KafkaPartitionHandle=[n/a,-1], offset=(not set)]
>         at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)
>         at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>         at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>         at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> The root cause seems to be that {{Kafka08Fetcher#MARKER}} is in the {{unassignedPartitionsQueue}}
more than once which could come from multiple calls to {{Kafka08Fetcher#cancel()}}. One code
path leading to this is {{FlinkKafkaConsumerBase#cancel()}} being called in one thread and
{{FlinkKafkaConsumerBase}}'s partition discovery loop thread dropping out before the first
thread was able to call {{Kafka08Fetcher#cancel}}.

This message was sent by Atlassian JIRA

View raw message