flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Flink flick cancel vs stop
Date Fri, 15 Sep 2017 16:05:45 GMT
Also relevant for this discussion: Several people (including me) by now were floating the idea
of reworking the source interface to take away the responsibility of stopping/canceling/continuing
from a specific source implementation and to instead give that power to the system. Currently
each source does basically this:

class Source<T> {
  public void run(Context ctx, Lock lock) {
    while ("forever long I want and I don't care") {
      synchronized (lock) {
        T output = ReadFrom.externalSystem();
         updateReadPositionState();
        ctx.collect(output);
      }
    }
  }
}

Meaning that any stopping/canceling behaviour requires cooperation from the source implementation.

This would be a different idea for a source interface:

abstract class NewSource {
  public abstract boolean start();
  public abstract boolean advance();
  public abstract void close();

  public abstract T getCurrent();
  public abstract Instant getCurrentTimestamp();
  public abstract Instant getWatermark();

  public abstract CheckpointMark getCheckpointMark();
}

Here the driver would sit outside and call the source whenever data should be provided. Stop/cancel
would not be a feature of the source function but of the code that calls it.

Best,
Aljoscha

> On 14. Sep 2017, at 20:03, Eron Wright <eronwright@gmail.com> wrote:
> 
> I too am curious about stop vs cancel.  I'm trying to understand the motivations a bit
more.
> 
> The current behavior of stop is basically that the sources become bounded, leading to
the job winding down.
> 
> The interesting question is how best to support 'planned' maintenance procedures such
as app upgrade and scale changes.   I think a good enhancement could be to stop precisely
at checkpoint time to prevent emission of spurious records.  Today the behavior of 'cancel
w/ savepoint' is at-least-once because the two operations aren't atomic.  Earlier I had assumed
that 'stop' would evolve in this direction but I suppose we could improve the atomicity of
'cancel /w savepoint' rather than implicating 'stop'.
> 
> A different direction for 'stop' might be to improve the determinism of bounding a streaming
job such that the stop point is well-understood in terms of the source.  For example, stopping
at a offset provided as a stop parameter.   Today I suppose one would rely on external state
to remember the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.
> 
> On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <uce@apache.org <mailto:uce@apache.org>>
wrote:
> Hey Elias,
> 
> sorry for the delay here. No, stop is not deprecated but not fully
> implemented yet. One missing part is migration of the existing source
> functions as you say.
> 
> Let me pull in Till for more details on this. @Till: Is there more
> missing than migrating the sources?
> 
> Here is the PR and discussion for reference:
> https://github.com/apache/flink/pull/750 <https://github.com/apache/flink/pull/750>
> 
> I would also really love to see this fully implemented in Flink. I
> don't expect this to happen for the upcoming 1.4 release though.
> 
> – Ufuk
> 
> 
> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fearsome.lucidity@gmail.com <mailto:fearsome.lucidity@gmail.com>>
wrote:
> > Anyone?
> >
> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fearsome.lucidity@gmail.com <mailto:fearsome.lucidity@gmail.com>>
> > wrote:
> >>
> >> I was wondering about the status of the flink stop command.  At first
> >> blush it would seem as the preferable way to shutdown a Flink job, but it
> >> depends on StoppableFunction being implemented by sources and I notice that
> >> the Kafka source does not seem to implement it.  In addition, the command
> >> does not -s  --withSavepoint like cancel does.
> >>
> >> Is stop deprecated?
> >
> >
> 


Mime
View raw message