flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: File System State Backend
Date Fri, 08 Sep 2017 07:56:53 GMT
Hi,

I just tried out checkpoint with FsStateBackend in 1.3.2 and everything works as expected
for me. Can you give a bit more detail what you mean by „checkpoint data is not cleaning“?
For example, is it not cleaned up while the job is running and accumulating „chk-[ID]“
directories or is something left over multiple restarts? Which filesystem are you using for
the checkpoints, e.g. local, HDFS, S3,… ? Does this also happen for other jobs?

Best,
Stefan

> Am 08.09.2017 um 03:17 schrieb rnosworthy <r_nosworthy@hotmail.com>:
> 
> Flink 1.3.2
> 
> I have 1 vm for the job manager and another for task manager.
> 
> I have a custom windowing trigger shown below.
> 
> My checkpoint data is not clearing.
> 
> I have tried to inject a fileStateThresholdSize when instantiating the
> FsStateBackend object, but that didn't work.
> 
> I have tried explicitly setting state.checkpoints.num-retained: 1 in the
> flink.yaml file but that didn't work either.
> 
> Not sure what else to try, can someone suggest anything.
> 
> Thanks in advance.
> 
> Ryan
> 
> ======================================================
> 
> 
> /**
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers
> *
> */
> public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> {
> 
>  private static final Logger LOG =
> LoggerFactory.getLogger(ThresholdTrigger.class);
>  private static final long serialVersionUID = 1L;
>  private static final SimpleDateFormat sdf = new SimpleDateFormat("dd
> HH:mm:ss a");
> 
>  private final ValueStateDescriptor<Integer> maxCountDesc =
>      new ValueStateDescriptor<>(
>          "max",
>          TypeInformation.of(new TypeHint<Integer>() {}));
> 
>  private final ReducingStateDescriptor<Integer> currentCountDesc =
>      new ReducingStateDescriptor<>(
>          "count",
>          new Sum(),
>          IntSerializer.INSTANCE);
> 
>  @Override
>  public TriggerResult onElement(MonitorProbe probe, long timestamp,
> TimeWindow window, TriggerContext ctx)
>      throws Exception {
> 
>    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
>      // if the watermark is already past the window fire immediately
>      clear(window, ctx);
>      return TriggerResult.FIRE_AND_PURGE;
>    }
> 
>    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);
>    ReducingState<Integer> currentCount =
> ctx.getPartitionedState(currentCountDesc);
>    currentCount.add(1);
> 
>    if (maxCount.value() == null) {
>      maxCount.update(probe.getThresholdConfig().getSampleSize());
>    }
> 
>    LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]",
>        probe.getLoggingKey(),
>        window.getStart(), window.getEnd(),
>        sdf.format(new Date(window.getStart())),
>        sdf.format(new Date(window.getEnd())),
>        currentCount.get(), maxCount.value());
> 
>    if (currentCount.get().equals(maxCount.value())){
>      clear(window, ctx);
>      return TriggerResult.FIRE_AND_PURGE;
>    }else{
>      ctx.registerEventTimeTimer(window.maxTimestamp());
>      return TriggerResult.CONTINUE;
>    }
> 
>  }
> 
>  @Override
>  public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx)
>      throws Exception {
>    throw new UnsupportedOperationException("This is not processing time
> trigger");
>  }
> 
>  @Override
>  public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
> 
>    ReducingState<Integer> currentCount =
> ctx.getPartitionedState(currentCountDesc);
>    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);
> 
>    if (currentCount.get().equals(maxCount.value())){
>      clear(window, ctx);
>      return TriggerResult.FIRE_AND_PURGE;
>    }else{
>      clear(window, ctx);
>      return TriggerResult.PURGE;
>    }
>  }
> 
>  @Override
>  public void clear(TimeWindow window, TriggerContext ctx) throws Exception
> {
>    ctx.getPartitionedState(currentCountDesc).clear();
>    ctx.getPartitionedState(maxCountDesc).clear();
>  }
> 
>  @Override
>  public String toString() {
>    return "ThresholdTrigger(" + maxCountDesc + ")";
>  }
> 
>  private static class Sum implements ReduceFunction<Integer> {
> 
>    private static final long serialVersionUID = 1L;
> 
>    @Override
>    public Integer reduce(Integer value1, Integer value2) throws Exception {
>      return value1 + value2;
>    }
> 
>  }
> }
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Mime
View raw message