apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pramod Immaneni (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-714) Reusable instance operator recovery
Date Thu, 14 Sep 2017 15:37:00 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166465#comment-16166465

Pramod Immaneni commented on APEXCORE-714:

@tweise I was thinking about this and restoring the stream to last fully processed window
will not work correctly the way things work today with bufferserver and queues and will require
more extensive changes. The reason is as follows, let's say an operator is recovered to a
window that is min(fully process window of itself and downstream operators) in an event of
an upstream operator failure, when the stream is re-opened to bufferserver, the old data for
that stream will be cleaned up resulting in data from checkpoint to recovered window to be
purged. If there is a second subsequent failure event in a downstream operator to this operator
before the next committed, the older data will not be available. To do it correctly we would
need to do pause and continue the output streams at the same time when the input streams are
being restored. I would presume the same would be true with the local queues if it were container
local. It's possible but I would like to take it up in a subsequent task.

What I would like to do now is leave the window restoration as what it is today and only reuse
the instance, so it will not be uber optimized in case of at-least-once as it will reprocess
from checkpoint window. This also means it will be applicable for both processing modes. In
future. I will make further optimizations for at least once case not affecting the at most
once case. 

Also [~sandesh] brought up a point that in some cases operator may need to know that this
is happening so I will provide an optional notification as well.

> Reusable instance operator recovery
> -----------------------------------
>                 Key: APEXCORE-714
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-714
>             Project: Apache Apex Core
>          Issue Type: Improvement
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
> In a failure scenario, when a container fails, it is redeployed along with all the operators
in it. The operators downstream to these operators are also redeployed within their containers.
The operators are restored from their checkpoint and connect to the appropriate point in the
stream according to the processing mode. In at least once mode, for example, the data is replayed
from the same checkpoint
> Restoring an operator state from checkpoint could turn out to be a costly operation depending
on the size of the state. In some use cases, based on the operator logic, when there is an
upstream failure, without restoring the operator from checkpoint and reusing the current instance,
will still produce the same results with the data replayed from the last fully processed window.
The operator state can remain the same as it was before the upstream failure by reusing the
same operator instance from before and only the streams and window reset to the window after
the last fully processed window to guarantee the at least once processing of tuples. If the
container where the operator itself is running goes down, it would need to be restored from
the checkpoint of course. This scenario occurs in some batch use cases with operators that
have a large state.
> I would like to propose adding the ability for a user to explicitly identify operators
to be of this type and the corresponding functionality in the engine to handle their recovery
in the way described above by not restoring their state from checkpoint, reusing the instance
and restoring the stream to the window after the last fully processed window for the operator.
When operators are not identified to be of this type, the default behavior is what it is today
and nothing changes.
> I have done some prototyping on the engine side to ensure that this is possible with
our current code base without requiring a massive overhaul, especially the restoration of
the operator instance within the Node in the streaming container, the re-establishment of
the subscriber stream to a window in the buffer server where the publisher (upstream) hasn't
yet reached as it would be restarting from checkpoint and have been able to get it all working

This message was sent by Atlassian JIRA

View raw message