kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seweryn Habdank-Wojewodzki (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)
Date Mon, 03 Jul 2017 13:15:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Seweryn Habdank-Wojewodzki resolved KAFKA-5530.
-----------------------------------------------
    Resolution: Not A Bug

The main problem, at least what we had observed at the end, was that our was simply *_too_*
small.

Currently we set: max.poll.interval.ms=1000000 and Kafka Stream (consuming one) is starting
properly.

Perhaps it would be good to have some hint in documentation, that max.poll.interval.ms should
not be too small, as it will cause endless rebalancing. 

The implicit explanation is here:
If poll() is not called before expiration of this timeout, then the consumer is considered
failed and the group will rebalance in order to reassign the partitions to another member.


But explicitely it is not stated, that max.poll.interval.ms shall be somewhat big :-).

> Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5530
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5530
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>         Environment: Linux, Windows
>            Reporter: Seweryn Habdank-Wojewodzki
>         Attachments: streamer_20-topics_1-thread-K-0.11.0.0.log.zip, streamer_20-topics_4-threads-K-0.11.0.0.log.zip,
streamer_2-topics_1-thread-K-0.11.0.0.log.zip, streamer_2-topics_4_threads-K-0.11.0.0.log.zip
>
>
> Dears,
> There are problems with balancer in KStreams (v. 0.10.2.x), when _num.stream.threads_
is bigger than 1 and the number of the input topics are bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE,
inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer ).to( outTopicName
);
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less than num.stream.threads
inTopicNames, then complete application startup is totally self-blocked, by writing endless
starnge things in log and not starting.
> Even more problematic is when the nuber of topics is higher than _num.stream.threads_
what I had commented in *KAFKA-5167 streams task gets stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is not desired.
When _num.stream.threads=1_ than all works fine :-( (for K v. 0.10.2.x, v. 0.11.0.0 does not
work at all).
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-3]
Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks:
([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-1]
Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null,
consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-ab798efe-16a6-4686-bdee-ccd50c937cd7],
state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([])
capacity: 1.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-1]
Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks:
([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2702
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-3] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-3] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-3] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-3] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-2] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-4] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-2] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-4] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-2] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-4] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-4] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-2] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-2]
Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null,
consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-3-consumer-16274860-9a0f-4df9-8af3-10f4c3c23d50,
stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-4-consumer-be7bc520-7174-4d6e-9258-9761b6c45bd9,
stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-2-consumer-401f1542-c311-4b1f-8f4e-72d6ade12583],
state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([])
capacity: 3.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-2]
Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks:
([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2703
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2703
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2703
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-4] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-2] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-1]
Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null,
consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-b35886f7-0525-458b-9b3e-8856554d0afb],
state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([])
capacity: 1.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-1]
Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks:
([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2704
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-2] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-4] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-2] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-4] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-2] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-4] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-2] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-4] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions
[] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-3] partitions
[[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-3] Updating
suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-3] Removing
all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-3] Removing
all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-2]
Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null,
consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-3-consumer-142cd5c5-a52d-494a-a8be-ee1f9ae831e2,
stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-2-consumer-f8a93346-c322-4e9e-ab38-c9a9eb4a9fa4,
stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-4-consumer-0726705d-c88f-4ad2-81c0-9ab02175b53e],
state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([])
capacity: 3.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2]
Completed validating internal topics in partition assignor
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-2]
Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks:
([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}.
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2705
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2705
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group stream with
generation 2705
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-4] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned partitions
[] for group stream
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-3] New partitions
[[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-2] New partitions
[[]] assigned at the end of consumer rebalance. 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message