kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "G G (Jira)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-9266) KafkaConsumer manual assignment does not reset group assignment
Date Wed, 04 Dec 2019 14:23:00 GMT
G G created KAFKA-9266:
--------------------------

             Summary: KafkaConsumer manual assignment does not reset group assignment
                 Key: KAFKA-9266
                 URL: https://issues.apache.org/jira/browse/KAFKA-9266
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.3.0
            Reporter: G G


When using the manual assignment API, SubscriptionState still remembers group subscriptions
in its groupSubscription member of topics to which it is no longer subscribed.

See the following code which shows the unexpected behavior:
{code:java}
    TopicPartition tp1 = new TopicPartition("a", 0);
    TopicPartition tp2 = new TopicPartition("b", 0);
    LogContext logContext = new LogContext();
    SubscriptionState state = new SubscriptionState(logContext, OffsetResetStrategy.NONE);
    state.assignFromUser(ImmutableSet.of(tp1, tp2));
    state.unsubscribe();
    state.assignFromUser(ImmutableSet.of(tp1));
    assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
    
    state.assignFromUser(ImmutableSet.of(tp1, tp2));
    state.assignFromUser(ImmutableSet.of(tp1));
    assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: Expected [a]
but was [a, b]
{code}

The problem seems to be that within SubscriptionState.changeSubscription() the groupSubscription
only grows and is never trimmed if the assignment is manual:
{code}
    private boolean changeSubscription(Set<String> topicsToSubscribe) {
        ...
        groupSubscription = new HashSet<>(groupSubscription);
        groupSubscription.addAll(topicsToSubscribe);
        ....
    }
{code}

This behavior in turn leads to METADATA requests by the client with partitions which are actually
no longer assigned:
{code}
KafkaConsumer consumer;
consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
consumer.poll(); // This will cause a MetadataRequest to be sent to the broker with topic1
and topic2
consumer.assign(ImmutableList.of(topicPartition1));
consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and topic2 instead
of only topic1
{code}
And this in turn causes the deletion of the topicPartion2 to fail. The workaround is to do
a consumer.unassign(); before the second consumer.assign();




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message