flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Roman Khachatryan (Jira)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-23170) Write metadata after materialization
Date Fri, 06 Aug 2021 08:24:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-23170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Roman Khachatryan updated FLINK-23170:
--------------------------------------
    Description: 
Currently, changelog state backend writes state metadata on first state access. It is written
to the changelog
 On materialization, the changelog can be truncated, so the metadata needs to be written again.

 

Below is a proposed solution using the existing metadtaWritten flag.

An alternative would be to write metadata at the end of the materialized stream.
 Yet another approach is to write metadata to a separate file (however, it seems less optimal
than at the end of the materialized stream and not so easy as writing again).

There are several questions to answer:
 - *When to mark* the metadata as not written (i.e. reset the metadataWritten flag)?
 ** After starting the materialization - so that any subsequent data is preceded by metadata
 - *When to request* the write (i.e. call append)
     At any point (mat. start / mat. end / checkpoint start). It doesn't matter for correctness
- see the next points.
 Scheduling append earlier means:
 -- including metadata in changelog twice unnecesserily (won't hurt correctness)
 -- writing for nothing if materialization fails

Scheduling append later means slowing down the checkpoint
 So at materialization end seem to be a better tradeoff.
 - *What* metadata to write?
      Only for data which were changed after materialization started (so the flag is enough)
 - *Where* in changelog to write it to?
     No choice but to the end of the changelog. Because of updating SQN, the metadata will
appear at the beginning of the state object returned by persist(sqn) called after materialization
completes.
 - *How to wait for write completion* (before completing checkpoint)?
 Once appended, the future returned from persist() call should include it already
  

So to achieve this it's enough to call appendMetadata() for each changed state upon materialization
start, or finish, or 1st checkpoint after it.

—
 Another related change is to skip writing metadata on recovery (only if state was read from
the changelog). 
 This can be achieved by setting the flag when requesting the state from ChangeLogApplier.
 *Please create a separate ticket for that if not implementing in this one.*

—
 Note: with TM-side state ownership, actual log truncation may be delayed after materialization
(until all the checkpoints using the log are subsumed). This should not affect the above logic.
  
 

  was:
Currently, changelog state backend writes state metadata on first state access. It is written
to the changelog
 On materialization, the changelog can be truncated, so the metadata needs to be written again.

 

Below is a proposed solution using the existing metadtaWritten flag.

An alternative would be to write metadata at the end of the materialized stream.
Yet another approach is to write metadata to a separate file (however, it seems less optimal
than at the end of the materialized stream and not so easy as writing again).

There are several questions to answer:
 - *When to mark* the metadata as not written (i.e. reset the metadataWritten flag)?
 ** After starting the materialization - so that any subsequent data is preceded by metadata
 - *When to request* the write (i.e. call append)
     At any point (mat. start / mat. end / checkpoint start). It doesn't matter for correctness
- see the next points.
 Scheduling append earlier means:
 -- including metadata in changelog twice unnecesserily (won't hurt correctness)
 -- writing for nothing if materialization fails

Scheduling append later means slowing down the checkpoint
 So at materialization end seem to be a better tradeoff.
 - *What* metadata to write?
      Only for data which were changed after materialization started (so the flag is enough)
 - *Where* in changelog to write it to?
     No choice but to the end of the changelog. Because of updating SQN, the metadata will
appear at the beginning of the state object returned by persist(sqn) called after materialization
completes.
 - *How to wait for write completion* (before completing checkpoint)?
 Once appended, the future returned from persist() call should include it already
  

So to achieve this it's enough to call appendMetadata() for each changed state upon materialization
start, or finish, or 1st checkpoint after it.

It can be further optimized by storing the SQN at which the metadata was written and only
resetting the flag if materializedSqn >= metadataSqn; but materialization is relatively
rare so it probably doesn't worth it.

—
 Another related change is to skip writing metadata on recovery (only if state was read from
the changelog). 
 This can be achieved by setting the flag when requesting the state from ChangeLogApplier.
 *Please create a separate ticket for that if not implementing in this one.*

—
 Note: with TM-side state ownership, actual log truncation may be delayed after materialization
(until all the checkpoints using the log are subsumed). This should not affect the above logic.
  


> Write metadata after materialization
> ------------------------------------
>
>                 Key: FLINK-23170
>                 URL: https://issues.apache.org/jira/browse/FLINK-23170
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / State Backends
>            Reporter: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Currently, changelog state backend writes state metadata on first state access. It is
written to the changelog
>  On materialization, the changelog can be truncated, so the metadata needs to be written
again.
>  
> Below is a proposed solution using the existing metadtaWritten flag.
> An alternative would be to write metadata at the end of the materialized stream.
>  Yet another approach is to write metadata to a separate file (however, it seems less
optimal than at the end of the materialized stream and not so easy as writing again).
> There are several questions to answer:
>  - *When to mark* the metadata as not written (i.e. reset the metadataWritten flag)?
>  ** After starting the materialization - so that any subsequent data is preceded by metadata
>  - *When to request* the write (i.e. call append)
>      At any point (mat. start / mat. end / checkpoint start). It doesn't matter for
correctness - see the next points.
>  Scheduling append earlier means:
>  -- including metadata in changelog twice unnecesserily (won't hurt correctness)
>  -- writing for nothing if materialization fails
> Scheduling append later means slowing down the checkpoint
>  So at materialization end seem to be a better tradeoff.
>  - *What* metadata to write?
>       Only for data which were changed after materialization started (so the flag
is enough)
>  - *Where* in changelog to write it to?
>      No choice but to the end of the changelog. Because of updating SQN, the metadata
will appear at the beginning of the state object returned by persist(sqn) called after materialization
completes.
>  - *How to wait for write completion* (before completing checkpoint)?
>  Once appended, the future returned from persist() call should include it already
>   
> So to achieve this it's enough to call appendMetadata() for each changed state upon materialization
start, or finish, or 1st checkpoint after it.
> —
>  Another related change is to skip writing metadata on recovery (only if state was read
from the changelog). 
>  This can be achieved by setting the flag when requesting the state from ChangeLogApplier.
>  *Please create a separate ticket for that if not implementing in this one.*
> —
>  Note: with TM-side state ownership, actual log truncation may be delayed after materialization
(until all the checkpoints using the log are subsumed). This should not affect the above logic.
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message