kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boyang Chen <bche...@outlook.com>
Subject Stream config in StreamPartitionAssignor
Date Mon, 09 Jul 2018 21:06:57 GMT
Hey there,

over the weekend I was debugging the streams configuration not passed within threads. I noticed
that one of the code path from KafkaConsumer (L743) was to initialize the StreamPartitionAssignor:

this.assignors = config.getConfiguredInstances(

However, it was using the ConsumerConfig instance (that config is passed in), so if I want
to make some configuration change in the assignor, I need to put consumer prefix. To make
the debugging even harder, there was an logAll() function in AbstractConfig which will print
"StreamsConfig values" at the beginning, since it is indeed a stream config:

public void configure(final Map<String, ?> configs) {
    final StreamsConfig streamsConfig = new StreamsConfig(configs);

(L190 in StreamPartitionAssignor)

This would further confuse developer as they see two different sets of StreamsConfig: one
from top level, one from this derived level per thread.

My point is that we could either: 1. let developer be aware that they need to add consumer
prefix to pass in configs to StreamPartitionAssignor 2. we found a way to pass in original

I know this is a little bit lengthy description, let me know if you feel unclear about my
proposal, or this is not a concern since most people already know the trick here, thank you!

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message