kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boyang Chen <bche...@outlook.com>
Subject Stream config in StreamPartitionAssignor
Date Mon, 09 Jul 2018 21:06:57 GMT
Hey there,


over the weekend I was debugging the streams configuration not passed within threads. I noticed
that one of the code path from KafkaConsumer (L743) was to initialize the StreamPartitionAssignor:

this.assignors = config.getConfiguredInstances(
        ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
        PartitionAssignor.class);

However, it was using the ConsumerConfig instance (that config is passed in), so if I want
to make some configuration change in the assignor, I need to put consumer prefix. To make
the debugging even harder, there was an logAll() function in AbstractConfig which will print
"StreamsConfig values" at the beginning, since it is indeed a stream config:

@Override
public void configure(final Map<String, ?> configs) {
    final StreamsConfig streamsConfig = new StreamsConfig(configs);

(L190 in StreamPartitionAssignor)


This would further confuse developer as they see two different sets of StreamsConfig: one
from top level, one from this derived level per thread.


My point is that we could either: 1. let developer be aware that they need to add consumer
prefix to pass in configs to StreamPartitionAssignor 2. we found a way to pass in original
StreamsConfig.

I know this is a little bit lengthy description, let me know if you feel unclear about my
proposal, or this is not a concern since most people already know the trick here, thank you!


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message