flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 陈Darling <chendonglin...@gmail.com>
Subject Why is the size of each checkpoint increasing?
Date Mon, 29 Jul 2019 09:09:14 GMT

Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
        .assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism)
        .keyBy(0)
        .countWindow(countSize, slideSize)
        .trigger(DeltaTrigger.of(triggerMeters,
                new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public double getDelta(
                            Tuple4<Integer, Integer, Double, Long> oldDataPoint,
                            Tuple4<Integer, Integer, Double, Long> newDataPoint) {
                        return newDataPoint.f2 - oldDataPoint.f2;
                    }
                }, carData.getType().createSerializer(env.getConfig())))
        .maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should
be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
   ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin




Mime
View raw message