flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Márton Balassi <balassi.mar...@gmail.com>
Subject Re: Making state in streaming more explicit
Date Fri, 01 May 2015 15:53:53 GMT
The current aim is the first option as you have correctly derived. :)
On May 1, 2015 5:39 PM, "Aljoscha Krettek" <aljoscha@apache.org> wrote:

> From this discussion I derive that we will have a state abstraction that
> everyone who requires state will work with? Or will the state be in object
> fields and they will be saved upon invocation of some doBackup() method.
> On Apr 30, 2015 10:31 PM, "Stephan Ewen" <sewen@apache.org> wrote:
>
> > That would be one way of doing it, yes...
> >
> > On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> >
> > > Okay, so the commit would be something like:
> > >
> > > commitState(OperatorState state)
> > >
> > >
> > > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <sewen@apache.org>
> wrote:
> > >
> > > > I think your assumption (and the current kafka source implementation)
> > is
> > > > that there is one state object that you update/mutate all the time.
> > > >
> > > > If you draw a snapshot state object at the time of checkpoint, the
> > source
> > > > can continue and that particular offset is remembered as the state of
> > > this
> > > > checkpoint
> > > > and can be committed to kafka/zookeeper later.
> > > >
> > > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gyula.fora@gmail.com>
> > > wrote:
> > > >
> > > > > Regarding the commits (for instance kafka offset):
> > > > >
> > > > > I dont exactly get how you mean to do this, if the source continues
> > > > > processing after the checkpoint and before the commit, it will not
> > know
> > > > > what state has been committed exactly, so it would need to know the
> > > time
> > > > of
> > > > > checkpoint and store a local copy.
> > > > >
> > > > > Gyula
> > > > >
> > > > >
> > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <sewen@apache.org>
> > > wrote:
> > > > >
> > > > > > Thanks for the comments!
> > > > > >
> > > > > > Concerning acknowledging the checkpoint:
> > > > > >
> > > > > >    The sinks need to definitely acknowledge it.
> > > > > >    If we asynchronously write the state of operator (and emit
> > > > downstream
> > > > > > barriers before that is complete),
> > > > > >    then I think that we also need those operators to acknowledge
> > the
> > > > > > checkpoint.
> > > > > >
> > > > > >
> > > > > > For the commit messages:
> > > > > >
> > > > > >    My first thought was to send commit messages simply as actor
> > > > messages
> > > > > > from the JobManager
> > > > > >    to the vertices that require these messages. That way, they
> are
> > > not
> > > > > > stuck in the data flow with its possible latency.
> > > > > >    Also, in the data flow, messages get duplicated (at all to
all
> > > > > > connections).
> > > > > >
> > > > > >
> > > > > > For iterative flows:
> > > > > >
> > > > > > Does the JobManager need to be aware of this, or can the
> > > IterationHead
> > > > > > handle that transparently for the JobManager.
> > > > > > From our last conversation, I recall:
> > > > > >  - Receive barriers, push out barriers
> > > > > >  - snapshot its state
> > > > > >  - wait for the barriers to come back through the backchannel
> > > > > >  - write the state snapshot plus the backchannel buffers
> > > > > >  - then only acknowledge the checkpoint
> > > > > >
> > > > > > My first impression is that this way the JobManager would not
> > handle
> > > > the
> > > > > > IterationHead any different from all other stateful operators.
> > > > > >
> > > > > > Greetings,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <parisc@kth.se>
> > > wrote:
> > > > > >
> > > > > > > I agree with all suggestions, thanks for summing it up
Stephan.
> > > > > > >
> > > > > > > A few more points I have in mind at the moment:
> > > > > > >
> > > > > > > - Regarding the acknowledgements, indeed we don’t need
to make
> > all
> > > > > > > operators commit back, we just have to make sure that all
sinks
> > > have
> > > > > > > acknowledged a checkpoint to consider it complete back
at the
> > > > > > coordinator.
> > > > > > >
> > > > > > > - Do you think we should broadcast commit responses to
sources
> > that
> > > > > need
> > > > > > > it after every successful checkpoint? The checkpoint interval
> > does
> > > > not
> > > > > > > always match with the frequency we want to initiate a
> compaction
> > > for
> > > > > > > example on Kafka. One alternative would be to make sources
> > request
> > > a
> > > > > > > successful checkpoint id via a future on demand.
> > > > > > >
> > > > > > > - We have to update the current checkpointing approach
to cover
> > > > > iterative
> > > > > > > streams. We need to make sure we don’t send checkpoint
requests
> > to
> > > > > > > iteration heads and handle downstream backup for records
in
> > transit
> > > > > > during
> > > > > > > checkpoints accordingly.
> > > > > > >
> > > > > > > cheers
> > > > > > > Paris
> > > > > > >
> > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <sewen@apache.org>
> > wrote:
> > > > > > > >
> > > > > > > > I was looking into the handling of state in streaming
> > operators,
> > > > and
> > > > > it
> > > > > > > is
> > > > > > > > a bit hidden from the system
> > > > > > > >
> > > > > > > > Right now, functions can (of they want) put some state
into
> > their
> > > > > > > context.
> > > > > > > > At runtime, state may occur or not. Before runtime,
the
> system
> > > > cannot
> > > > > > > tell
> > > > > > > > which operators are going to be stateful, and which
are going
> > to
> > > be
> > > > > > > > stateless.
> > > > > > > >
> > > > > > > > I think it is a good idea to expose that. We can use
that for
> > > > > > > optimizations
> > > > > > > > and we know which operators need to checkpoint state
and
> > > > acknowledge
> > > > > > the
> > > > > > > > asynchronous checkpoint.
> > > > > > > >
> > > > > > > > At this point, we need to assume that all operators
need to
> > send
> > > a
> > > > > > > > confirmation message, which is unnecessary.
> > > > > > > >
> > > > > > > > Also, I think we should expose which operations want
a
> "commit"
> > > > > > > > notification after the checkpoint completed. Good
examples
> are
> > > > > > > >
> > > > > > > >  - the KafkaConsumer source, which can then commit
the offset
> > > that
> > > > is
> > > > > > > safe
> > > > > > > > to zookeeper
> > > > > > > >
> > > > > > > >  - a transactional KafkaProduce sink, which can commit
a
> batch
> > of
> > > > > > > messages
> > > > > > > > to the kafka partition once the checkpoint is done
(to get
> > > exactly
> > > > > once
> > > > > > > > guarantees that include the sink)
> > > > > > > >
> > > > > > > > Comments welcome!
> > > > > > > >
> > > > > > > > Greetings,
> > > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message