kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5989) disableLogging() causes partitions to not be consumed
Date Mon, 02 Oct 2017 17:40:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188486#comment-16188486

ASF GitHub Bot commented on KAFKA-5989:

GitHub user dguy opened a pull request:


    KAFKA-5989: resume consumption of tasks that have state stores but no changelogging

    Stores where logging is disabled where never consumed as the partitions were paused, but
never resumed.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dguy/kafka restore

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4002
commit 1e1dec0db90f6d0a28a4d6b497593e22523f4380
Author: Damian Guy <damian.guy@gmail.com>
Date:   2017-10-02T17:37:44Z

    resume consumption of tasks that have state stores but no changelogging


> disableLogging() causes partitions to not be consumed
> -----------------------------------------------------
>                 Key: KAFKA-5989
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5989
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions:
>            Reporter: Tuan Nguyen
>            Assignee: Damian Guy
>            Priority: Blocker
>             Fix For: 1.0.0
>         Attachments: App2.java, App.java
> Using {{disableLogging()}} for either of the built-in state store types causes an initialization
loop in the StreamThread.
> Case A - this works just fine:
> {code}
> 		final StateStoreSupplier testStore = Stores.create(topic)
> 				.withStringKeys()
> 				.withStringValues()
> 				.inMemory()
> //				.disableLogging() 
> 				.maxEntries(10)
> 				.build();
> {code}
> Case B - this does not:
> {code}
> 		final StateStoreSupplier testStore = Stores.create(topic)
> 				.withStringKeys()
> 				.withStringValues()
> 				.inMemory()
> 				.disableLogging() 
> 				.maxEntries(10)
> 				.build();
> {code}
> A brief debugging dive shows that in Case B, {{AssignedTasks.allTasksRunning()}} never
returns true, because of a remnant entry in {{AssignedTasks#restoring}} that never gets properly
> See [^App.java] for a working test (requires ZK + Kafka ensemble, and at least one keyed
message produced to the "test" topic)

This message was sent by Atlassian JIRA

View raw message