kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6108) Synchronizing on commits and StandbyTasks can be improved
Date Mon, 23 Oct 2017 21:07:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Guozhang Wang updated KAFKA-6108:
---------------------------------
    Description: 
In Kafka Streams, we use an optimization that allows us to reuse a source topic as changelog
topic (and thus, avoid unnecessary data duplication) if we read a topic directly as {{KTable}}.
To guarantee that {{StandbyTasks}} provide a correct state, we need to synchronize the read
progress of the {{StandbyTasks}} with the processing progress of the main {{StreamTask}} ---
otherwise, the {{StandbyTasks}} might restore state too much into the future. For this, we
limit the allowed restore offsets of the {{StandbyTasks}} to be not larger than the committed
offsets of the {{StreamTask}}.

Furthermore, we buffer all data returned by the restore consumer that is beyond the allowed
restore-offsets in-memory.

To achieve both goals, we regularly update the max allowed restore offsets (this is done within
task internally) and we also use a flag {{processStandbyRecords}} within {{StreamThread}}
with the purpose to not call {{poll()}} on the restore consumer if our in-memory buffer has
already data beyond the allowed max restore offsets.

We should consider:
 - unify both places in the code and put the whole logic into a single place (suggestion is
to use the {{StreamThread}} -- a tasks, does not need to know about this optimization)
 - feed only those data into the task, that the task is allowed to restore (instead of everything)

  was:
In Kafka Streams, we use an optimization that allows us to reuse a source topic as changelog
topic (and thus, avoid unnecessary data duplication) if we read a topic directly as {{KTable}}.
To guarantee that {{StandbyTasks}} provide a correct state, we need to synchronize the read
progress of the {{StandbyTasks}} with the processing progress of the main {{StreamTask}} ---
otherwise, the {{StandbyTasks}} might restore state too much into the future. For this, we
limit the allowed restore offsets of the {{StandbyTasks}} to be not larger than the committed
offsets of the {{StreamTask}}.

Furthermore, we buffer all data returned by the restore consumer that is beyond the allowed
restore-offsets in-memory.

To achieve both goals, we regularly update the max allowed restore offsets (this is done task
internally) and we also use a flag {{processStandbyRecords}} within {{StreamThread}} with
the purpose to not call {{poll()}} on the restore consumer if our in-memory buffer has already
data beyond the allowed max restore offsets.

We should consider:
 - unify both places in the code and put the whole logic into a single place (suggestion is
to use the {{StreamThread}} -- a tasks, does not need to know about this optimization)
 - feed only those data into the task, that the task is allowed to restore (instead of everything)


> Synchronizing on commits and StandbyTasks can be improved
> ---------------------------------------------------------
>
>                 Key: KAFKA-6108
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6108
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>
> In Kafka Streams, we use an optimization that allows us to reuse a source topic as changelog
topic (and thus, avoid unnecessary data duplication) if we read a topic directly as {{KTable}}.
To guarantee that {{StandbyTasks}} provide a correct state, we need to synchronize the read
progress of the {{StandbyTasks}} with the processing progress of the main {{StreamTask}} ---
otherwise, the {{StandbyTasks}} might restore state too much into the future. For this, we
limit the allowed restore offsets of the {{StandbyTasks}} to be not larger than the committed
offsets of the {{StreamTask}}.
> Furthermore, we buffer all data returned by the restore consumer that is beyond the allowed
restore-offsets in-memory.
> To achieve both goals, we regularly update the max allowed restore offsets (this is done
within task internally) and we also use a flag {{processStandbyRecords}} within {{StreamThread}}
with the purpose to not call {{poll()}} on the restore consumer if our in-memory buffer has
already data beyond the allowed max restore offsets.
> We should consider:
>  - unify both places in the code and put the whole logic into a single place (suggestion
is to use the {{StreamThread}} -- a tasks, does not need to know about this optimization)
>  - feed only those data into the task, that the task is allowed to restore (instead of
everything)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message