kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "jmhostalet (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation
Date Fri, 19 Jul 2019 11:31:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

jmhostalet resolved KAFKA-8646.
-------------------------------
    Resolution: Not A Bug

> Materialized.withLoggingDisabled() does not disable changelog topics creation
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-8646
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8646
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: jmhostalet
>            Assignee: Bill Bejeck
>            Priority: Minor
>
> I have a cluster with 3 brokers running version 0.11
> My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated to 2.3.0
> I have no executed any migration as my data is disposable, therefore I have deleted all
intermediate topics, except input and output topics.
> My streams config is: 
> {code:java}
> application.id = consumer-id-v1.00
> application.server =
> bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 524288000
> client.id =
> commit.interval.ms = 30000
> connections.max.idle.ms = 540000
> default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class com.acme.stream.TimeExtractor
> default.value.serde = class com.acme.serde.MyDtoSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 25
> partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 40000
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /tmp/kafka-streams
> topology.optimization = none
> upgrade.from = null
> windowstore.changelog.additional.retention.ms = 86400000
> {code}
> in my stream I am using withLoggingDisabled 
> {code:java}
> stream.filter((key, val) -> val!=null)
>     .selectKey((key, val) -> getId(val))
>     .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
>     .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
>                            .grace(windowRetentionPeriodDuration))
>     .aggregate(MyDto::new,
>                new MyUpdater(),
>                Materialized.as("aggregation-updater")
>                            .withLoggingDisabled()
>                            .with(Serdes.String(), new MyDtoSerde()))
>     .toStream((k, v) -> k.key())
>     .mapValues(val -> { ...
> {code}
> but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if I delete
them before running again the app or if I change the application.id
> With a new application.id, topics are recreated with the new prefix.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Mime
View raw message