apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chandni Singh (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (APEXMALHAR-2130) implement scalable windowed storage
Date Tue, 30 Aug 2016 03:04:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15447840#comment-15447840
] 

Chandni Singh edited comment on APEXMALHAR-2130 at 8/30/16 3:04 AM:
--------------------------------------------------------------------

Note: The main change in ManagedState which is required here is that timeBuckets (Window time
in your example) is now computed outside ManagedState. TimeBuckets were being computed by
TimeBucketAssigner within ManagedState but now it will be provided to it.

>>>>
Since event time is arbitrary, unlike processing time, the actual key representing the timebucket
cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries
seems to return a long that is sequential starting from 0. We want to make the actual timebucket
key based on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this does NOT assume
that events are received in order. Based on event time, this method creates timebucket. In
your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach
it:
 - create a special TimeBucketAssigner which will just return the input Window for the event.
It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null,
then the time argument is used as timebucket save in Bucket.

>>>
Expiring and purging are done very differently and should be based on 3. Managed State should
determine whether to purge a timebucket based on whether an Apex window is committed and whether
all event windows that belong to that timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here.
TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and
checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest
purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can
delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager
deletes the data up to that timebucket only when the window in which it was request to be
purged gets committed. So this will remain the same for you as well.

I think this can also by achieved by creating a special TimeBucketAssigner and overriding
a few methods.



was (Author: csingh):
Note: The main change in ManagedState which is required here is that timeBuckets (Window time
in your example) is now computed outside ManagedState. TimeBuckets were being computed by
TimeBucketAssigner within ManagedState but now it will be provided to it.

Since event time is arbitrary, unlike processing time, the actual key representing the timebucket
cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries
seems to return a long that is sequential starting from 0. We want to make the actual timebucket
key based on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this does NOT assume
that events are received in order. Based on event time, this method creates timebucket. In
your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach
it:
 - create a special TimeBucketAssigner which will just return the input Window for the event.
It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null,
then the time argument is used as timebucket save in Bucket.

Expiring and purging are done very differently and should be based on 3. Managed State should
determine whether to purge a timebucket based on whether an Apex window is committed and whether
all event windows that belong to that timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here.
TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and
checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest
purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can
delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager
deletes the data up to that timebucket only when the window in which it was request to be
purged gets committed. So this will remain the same for you as well.

I think this can also by achieved by creating a special TimeBucketAssigner and overriding
a few methods.


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the checkpointing
window id.  This should be done incrementally (ManagedState) to avoid wasting space with unchanged
data
> 3. When recovering, it takes the recovery window id and restores to that snapshot
> 4. When a window is committed, all windows with a lower ID should be purged from the
store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, and because
of 2 and 3, we may want to add methods to the WindowedStorage interface so that the implementation
of WindowedOperator can notify the storage of checkpointing, recovering and committing of
a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message