From user-return-22367-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Aug 24 18:49:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5E9A3180629 for ; Fri, 24 Aug 2018 18:49:19 +0200 (CEST) Received: (qmail 26649 invoked by uid 500); 24 Aug 2018 16:49:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 26639 invoked by uid 99); 24 Aug 2018 16:49:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Aug 2018 16:49:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id BD5991811AD for ; Fri, 24 Aug 2018 16:49:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.989 X-Spam-Level: * X-Spam-Status: No, score=1.989 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id VNtAZO41A2ig for ; Fri, 24 Aug 2018 16:49:13 +0000 (UTC) Received: from mail-wr1-f41.google.com (mail-wr1-f41.google.com [209.85.221.41]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id B77915F437 for ; Fri, 24 Aug 2018 16:49:12 +0000 (UTC) Received: by mail-wr1-f41.google.com with SMTP id m27-v6so8018038wrf.3 for ; Fri, 24 Aug 2018 09:49:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=dVePJpRqIGe6q1omBNTOb1kdPEXC8FD1djz5Y7efhu4=; b=Fk63bllpc86oLAe9gDvfGgiuKRnczquo37HfjiZsY8rxM5knBcqiOFsC3Zvf2hmXEC FF4NZiLZA/KWP91RKsZ8oMmpP3YWbBCrvHWySSGVnQPFkOXUuT6VeA3ELbz7mz/CDuRy tBRXNRK2Y92h8HSRgj9GGV+spJV0HS6SLTJvSXwjnxiRb4f0XllErUNiIkDfc+QFdnp/ +dlc1SpeQ2R+yI4qqNftB8nxZIkz6V5iGGU07xt/Z11oVDov1RgM8T4Vo5sLuxfKuwCc tZv8y7gd0pf6DN5LCiYThxIZSViTE7yHo6amMjALKuNumfzhFrySO2lB5RJ0NwQ28Tpa jvPQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=dVePJpRqIGe6q1omBNTOb1kdPEXC8FD1djz5Y7efhu4=; b=tzyHeKs1OKy38EeqXkOJtYblgjYGmsz5ihRuOrktecJPNzL+PF/A6G0NsKEePeG3vR lHPmbjgaY/jKp1W+wyY6nuQyaOglD7det7MmUBlu/kOZdh/8u5pheyA51JhezfTBNJW0 e2zNDMwt3qXwIfNXv0Sbd1JVOiVno6CLyJZ/FCv/96x96XgB6zNALqIj7iWpX1aGfkpE 2rL6GrRf2+YPD+sJajL2a5IOaaGiVB2k6RgShm5Q6JS5NqnQy4pQuNsKXPhp/g9pbiFA sTD42gBzMewJjXzwF+vcLBSv8mAez9KAH0sos8vieCMukzB2Hw1aE2xSSZO2kyyaud8F jPmw== X-Gm-Message-State: APzg51CKp5MqDxQ5ZpCIK/EGr0LSBjQREKPIU1qM+8pruMrut87dXxeW MS5lvAo7HbPlXHVi6yugCuWtXX/N7Al4HQ== X-Google-Smtp-Source: ANB0VdbEiQ/hsKtEyQ4pYU5tvBF/7DTcAoCyUYhkUP8w0vrZnZRTvZkC4LsSr6zN9zWo2xxLyIWsZw== X-Received: by 2002:adf:f4ce:: with SMTP id h14-v6mr1828750wrp.259.1535129352302; Fri, 24 Aug 2018 09:49:12 -0700 (PDT) Received: from andreys-mbp.office.data-artisans.net (dslb-002-205-086-134.002.205.pools.vodafone-ip.de. [2.205.86.134]) by smtp.gmail.com with ESMTPSA id f6-v6sm9740539wrr.68.2018.08.24.09.49.11 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 24 Aug 2018 09:49:11 -0700 (PDT) From: Andrey Zagrebin Message-Id: <5A4DC214-2A5B-4D2A-82EA-E27ACE0FDC40@data-artisans.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_EC93FADD-5F2B-4851-990E-3BC5ADE49E67" Mime-Version: 1.0 (Mac OS X Mail 11.5 \(3445.9.1\)) Subject: Re: Data loss when restoring from savepoint Date: Fri, 24 Aug 2018 18:49:10 +0200 In-Reply-To: Cc: user To: Juho Autio References: <514F56D1-15DE-4926-BC89-70F072A2AB9A@data-artisans.com> <2E2CD23C-14AF-4150-A913-6099A18A974A@data-artisans.com> X-Mailer: Apple Mail (2.3445.9.1) --Apple-Mail=_EC93FADD-5F2B-4851-990E-3BC5ADE49E67 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Ok, I think before further debugging the window reduced state,=20 could you try the new =E2=80=98StreamingFileSink=E2=80=99 [1] introduced = in Flink 1.6.0 instead of the previous 'BucketingSink=E2=80=99? Cheers, Andrey [1] = https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/stre= amfile_sink.html = > On 24 Aug 2018, at 18:03, Juho Autio wrote: >=20 > Yes, sorry for my confusing comment. I just meant that it seems like = there's a bug somewhere now that the output is missing some data. >=20 > > I would wait and check the actual output in s3 because it is the = main result of the job >=20 > Yes, and that's what I have already done. There seems to be always = some data loss with the production data volumes, if the job has been = restarted on that day. >=20 > Would you have any suggestions for how to debug this further? >=20 > Many thanks for stepping in. >=20 > On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin = > wrote: > Hi Juho, >=20 > So it is a per key deduplication job. >=20 > Yes, I would wait and check the actual output in s3 because it is the = main result of the job and >=20 > > The late data around the time of taking savepoint might be not = included into the savepoint but it should be behind the snapshotted = offset in Kafka. >=20 > is not a bug, it is a possible behaviour. >=20 > The savepoint is a snapshot of the data in transient which is already = consumed from Kafka. > Basically the full contents of the window result is split between the = savepoint and what can come after the savepoint'ed offset in Kafka but = before the window result is written into s3.=20 >=20 > Allowed lateness should not affect it, I am just saying that the final = result in s3 should include all records after it.=20 > This is what should be guaranteed but not the contents of the = intermediate savepoint. >=20 > Cheers, > Andrey >=20 >> On 24 Aug 2018, at 16:52, Juho Autio > wrote: >>=20 >> Thanks for your answer! >>=20 >> I check for the missed data from the final output on s3. So I wait = until the next day, then run the same thing re-implemented in batch, and = compare the output. >>=20 >> > The late data around the time of taking savepoint might be not = included into the savepoint but it should be behind the snapshotted = offset in Kafka. >>=20 >> Yes, I would definitely expect that. It seems like there's a bug = somewhere. >>=20 >> > Then it should just come later after the restore and should be = reduced within the allowed lateness into the final result which is saved = into s3. >>=20 >> Well, as far as I know, allowed lateness doesn't play any role here, = because I started running the job with allowedLateness=3D0, and still = get the data loss, while my late data output doesn't receive anything. >>=20 >> > Also, is this `DistinctFunction.reduce` just an example or the = actual implementation, basically saving just one of records inside the = 24h window in s3? then what is missing there? >>=20 >> Yes, it's the actual implementation. Note that there's a keyBy before = the DistinctFunction. So there's one record for each key (which is the = combination of a couple of fields). In practice I've seen that we're = missing ~2000-4000 elements on each restore, and the total output is = obviously much more than that. >>=20 >> Here's the full code for the key selector: >>=20 >> public class MapKeySelector implements = KeySelector, Object> { >>=20 >> private final String[] fields; >>=20 >> public MapKeySelector(String... fields) { >> this.fields =3D fields; >> } >>=20 >> @Override >> public Object getKey(Map event) throws Exception = { >> Tuple key =3D = Tuple.getTupleClass(fields.length).newInstance(); >> for (int i =3D 0; i < fields.length; i++) { >> key.setField(event.getOrDefault(fields[i], ""), i); >> } >> return key; >> } >> } >>=20 >> And a more exact example on how it's used: >>=20 >> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", = "KEY_NAME", "KEY_VALUE")) >> .timeWindow(Time.days(1)) >> .reduce(new DistinctFunction()) >>=20 >> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin = > wrote: >> Hi Juho, >>=20 >> Where exactly does the data miss? When do you notice that?=20 >> Do you check it: >> - debugging `DistinctFunction.reduce` right after resume in the = middle of the day=20 >> or=20 >> - some distinct records miss in the final output of BucketingSink in = s3 after window result is actually triggered and saved into s3 at the = end of the day? is this the main output? >>=20 >> The late data around the time of taking savepoint might be not = included into the savepoint but it should be behind the snapshotted = offset in Kafka. Then it should just come later after the restore and = should be reduced within the allowed lateness into the final result = which is saved into s3. >>=20 >> Also, is this `DistinctFunction.reduce` just an example or the actual = implementation, basically saving just one of records inside the 24h = window in s3? then what is missing there? >>=20 >> Cheers, >> Andrey >>=20 >>> On 23 Aug 2018, at 15:42, Juho Autio > wrote: >>>=20 >>> I changed to allowedLateness=3D0, no change, still missing data when = restoring from savepoint. >>>=20 >>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio > wrote: >>> I realized that BucketingSink must not play any role in this = problem. This is because only when the 24-hour window triggers, = BucketinSink gets a burst of input. Around the state restoring point = (middle of the day) it doesn't get any input, so it can't lose anything = either (right?). >>>=20 >>> I will next try removing the allowedLateness entirely from the = equation. >>>=20 >>> In the meanwhile, please let me know if you have any suggestions for = debugging the lost data, for example what logs to enable. >>>=20 >>> We use FlinkKafkaConsumer010 btw. Are there any known issues with = that, that could contribute to lost data when restoring a savepoint? >>>=20 >>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio > wrote: >>> Some data is silently lost on my Flink stream job when state is = restored from a savepoint. >>>=20 >>> Do you have any debugging hints to find out where exactly the data = gets dropped? >>>=20 >>> My job gathers distinct values using a 24-hour window. It doesn't = have any custom state management. >>>=20 >>> When I cancel the job with savepoint and restore from that = savepoint, some data is missed. It seems to be losing just a small = amount of data. The event time of lost data is probably around the time = of savepoint. In other words the rest of the time window is not entirely = missed =E2=80=93 collection works correctly also for (most of the) = events that come in after restoring. >>>=20 >>> When the job processes a full 24-hour window without interruptions = it doesn't miss anything. >>>=20 >>> Usually the problem doesn't happen in test environments that have = smaller parallelism and smaller data volumes. But in production volumes = the job seems to be consistently missing at least something on every = restore. >>>=20 >>> This issue has consistently happened since the job was initially = created. It was at first run on an older version of Flink 1.5-SNAPSHOT = and it still happens on both Flink 1.5.2 & 1.6.0. >>>=20 >>> I'm wondering if this could be for example some synchronization = issue between the kafka consumer offsets vs. what's been written by = BucketingSink? >>>=20 >>> 1. Job content, simplified >>>=20 >>> kafkaStream >>> .flatMap(new ExtractFieldsFunction()) >>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>> .timeWindow(Time.days(1)) >>> .allowedLateness(allowedLateness) >>> .sideOutputLateData(lateDataTag) >>> .reduce(new DistinctFunction()) >>> .addSink(sink) >>> // use a fixed number of output partitions >>> .setParallelism(8)) >>>=20 >>> /** >>> * Usage: .keyBy("the", "distinct", "fields").reduce(new = DistinctFunction()) >>> */ >>> public class DistinctFunction implements = ReduceFunction> { >>> @Override >>> public Map reduce(Map value1, = Map value2) { >>> return value1; >>> } >>> } >>>=20 >>> 2. State configuration >>>=20 >>> boolean enableIncrementalCheckpointing =3D true; >>> String statePath =3D "s3n://bucket/savepoints <>"; >>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >>>=20 >>> Checkpointing Mode Exactly Once >>> Interval 1m 0s >>> Timeout 10m 0s >>> Minimum Pause Between Checkpoints 1m 0s >>> Maximum Concurrent Checkpoints 1 >>> Persist Checkpoints Externally Enabled (retain on cancellation) >>>=20 >>> 3. BucketingSink configuration >>>=20 >>> We use BucketingSink, I don't think there's anything special here, = if not the fact that we're writing to S3. >>>=20 >>> String outputPath =3D "s3://bucket/output <>"; >>> BucketingSink> sink =3D new = BucketingSink>(outputPath) >>> .setBucketer(new ProcessdateBucketer()) >>> .setBatchSize(batchSize) >>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>> = .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>> sink.setWriter(new IdJsonWriter()); >>>=20 >>> 4. Kafka & event time >>>=20 >>> My flink job reads the data from Kafka, using a = BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to = synchronize watermarks accross all kafka partitions. We also write late = data to side output, but nothing is written there =E2=80=93 if it would, = it could explain missed data in the main output (I'm also sure that our = late data writing works, because we previously had some actual late data = which ended up there). >>>=20 >>> 5. allowedLateness >>>=20 >>> It may be or may not be relevant that I have also enabled = allowedLateness with 1 minute lateness on the 24-hour window: >>>=20 >>> If that makes sense, I could try removing allowedLateness entirely? = That would be just to rule out that Flink doesn't have a bug that's = related to restoring state in combination with the allowedLateness = feature. After all, all of our data should be in a good enough order to = not be late, given the max out of orderness used on kafka consumer = timestamp extractor. >>>=20 >>> Thank you in advance! >>>=20 >>>=20 >>=20 >>=20 >=20 >=20 --Apple-Mail=_EC93FADD-5F2B-4851-990E-3BC5ADE49E67 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Ok, = I think before further debugging the window reduced state, 
could you try the new =E2=80=98StreamingFileSink=E2=80=99 [1] = introduced in Flink 1.6.0 instead of the previous = 'BucketingSink=E2=80=99?

Cheers,
Andrey


On 24 = Aug 2018, at 18:03, Juho Autio <juho.autio@rovio.com> wrote:

Yes, sorry for my confusing comment. I just meant that it = seems like there's a bug somewhere now that the output is missing some = data.

> I = would wait and check the actual output in s3 because it is the main = result of the job

Yes, and that's what I have already done. There seems to be = always some data loss with the production data volumes, if the job has = been restarted on that day.

Would you have any suggestions for how = to debug this further?

Many thanks for stepping in.

On= Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <andrey@data-artisans.com> wrote:
Hi Juho,

So= it is a per key deduplication job.

Yes, I would wait and check the actual = output in s3 because it is the main result of the job and

> The late data = around the time of taking savepoint might be not included into the = savepoint but it should be behind the snapshotted offset in = Kafka.

is not = a bug, it is a possible behaviour.

The savepoint is a snapshot of the data = in transient which is already consumed from Kafka.
Basically the full contents of the window result is split = between the savepoint and what can come after the savepoint'ed = offset in Kafka but before the window result is written into = s3. 

Allowed lateness should not affect it, I am just saying that = the final result in s3 should include all records after = it. 
This is what should be guaranteed but not = the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 = Aug 2018, at 16:52, Juho Autio <juho.autio@rovio.com> = wrote:

Thanks for your answer!

I check for the missed = data from the final output on s3. So I wait until the next day, then run = the same thing re-implemented in batch, and compare the = output.

> = The late data around the time of taking savepoint might be not included = into the savepoint but it should be behind the snapshotted offset in = Kafka.

Yes, I would definitely expect that. It seems like there's a = bug somewhere.

> Then it should just come later after the restore and = should be reduced within the allowed lateness into the final result = which is saved into s3.

Well, as far as I know, allowed = lateness doesn't play any role here, because I started running the job = with allowedLateness=3D0, and still get the data loss, while my late = data output doesn't receive anything.

> Also, is this = `DistinctFunction.reduce` just an example or the actual implementation, = basically saving just one of records inside the 24h window in s3? then = what is missing there?

Yes, it's the actual implementation. Note that there's a = keyBy before the DistinctFunction. So there's one record for each = key (which is the combination of a couple of fields). In practice I've = seen that we're missing ~2000-4000 elements on each restore, and the = total output is obviously much more than that.

Here's the full code for the key = selector:

public class MapKeySelector implements = KeySelector<Map<String,String>, Object> {

    private = final String[] fields;

    public MapKeySelector(String... fields) = {
        this.fields =3D = fields;
    }

    @Override
    public Object getKey(Map<String, String> = event) throws Exception {
      =   Tuple key =3D = Tuple.getTupleClass(fields.length).newInstance();
        for (int i =3D 0; i < = fields.length; i++) {
        =     key.setField(event.getOrDefault(fields[i], ""), = i);
        }
        return key;
    }
}

And a more exact example = on how it's used:

            =     .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", = "KEY_NAME", "KEY_VALUE"))
    =             .timeWindow(Time.days(1))
          =       .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at = 5:26 PM Andrey Zagrebin <andrey@data-artisans.com> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice = that? 
Do you check it:
- = debugging `DistinctFunction.reduce` right after resume in the middle of = the day 
or 
- some = distinct records miss in the final output of BucketingSink in s3 = after window result is actually triggered and saved into s3 at the end = of the day? is this the main output?

The late data around the time of taking = savepoint might be not included into the savepoint but it should be = behind the snapshotted offset in Kafka. Then it should just come later = after the restore and should be reduced within the allowed lateness into = the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` = just an example or the actual implementation, basically saving just one = of records inside the 24h window in s3? then what is missing = there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <juho.autio@rovio.com> wrote:

I changed = to allowedLateness=3D0, no change, still missing data when restoring = from savepoint.

On Tue, Aug 21, 2018 = at 10:43 AM Juho Autio <juho.autio@rovio.com> wrote:
I = realized that BucketingSink must not play any role in this problem. This = is because only when the 24-hour window triggers, BucketinSink gets a = burst of input. Around the state restoring point (middle of the day) it = doesn't get any input, so it can't lose anything either (right?).

I will next try removing = the allowedLateness entirely from the equation.

In the meanwhile, please let me know if = you have any suggestions for debugging the lost data, for example what = logs to enable.

We use FlinkKafkaConsumer010 btw. Are there = any known issues with that, that could contribute to lost data when = restoring a savepoint?

On Fri, Aug 17, 2018 = at 4:23 PM Juho Autio <juho.autio@rovio.com> wrote:
Some data is silently lost on my Flink stream job when state = is restored from a savepoint.

Do you have any debugging hints to find = out where exactly the data gets dropped?

My job gathers distinct values using a = 24-hour window. It doesn't have any custom state management.

When I cancel the job = with savepoint and restore from that savepoint, some data is missed. It = seems to be losing just a small amount of data. The event time of lost = data is probably around the time of savepoint. In other words the rest = of the time window is not entirely missed =E2=80=93 collection works = correctly also for (most of the) events that come in after = restoring.

When = the job processes a full 24-hour window without interruptions it doesn't = miss anything.

Usually the problem doesn't happen in test environments that = have smaller parallelism and smaller data volumes. But in production = volumes the job seems to be consistently missing at least something on = every restore.

This issue has consistently happened since the job was = initially created. It was at first run on an older version of Flink = 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & = 1.6.0.

I'm = wondering if this could be for example some synchronization issue = between the kafka consumer offsets vs. what's been written by = BucketingSink?

1. Job content, simplified

  =       kafkaStream
                = .flatMap(new ExtractFieldsFunction())
                = .keyBy(new MapKeySelector(1, 2, 3, 4))
                = .timeWindow(Time.days(1))
                = .allowedLateness(allowedLateness)
                = .sideOutputLateData(lateDataTag)
                = .reduce(new DistinctFunction())
                = .addSink(sink)
    =             // use a fixed number of = output partitions
        =         .setParallelism(8))

/**
 * Usage: .keyBy("the", = "distinct", "fields").reduce(new DistinctFunction())
 */
public = class DistinctFunction implements = ReduceFunction<java.util.Map<String, String>> {
    @Override
    = public Map<String, String> reduce(Map<String, String> = value1, Map<String, String> value2) {
  =       return value1;
    = }
}

2. State configuration

boolean = enableIncrementalCheckpointing =3D true;
String statePath =3D "s3n://bucket/savepoints";
new = RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout = 10m 0s
Minimum Pause Between = Checkpoints = 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain = on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think = there's anything special here, if not the fact that we're writing to = S3.

        String outputPath =3D "s3://bucket/output";
        BucketingSink<Map<String, = String>> sink =3D new BucketingSink<Map<String, = String>>(outputPath)
      =           .setBucketer(new = ProcessdateBucketer())
        =         .setBatchSize(batchSize)
                = .setInactiveBucketThreshold(inactiveBucketThreshold)
                = .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new = IdJsonWriter());

4. Kafka & event time

My flink job reads the = data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the = kafka consumer to synchronize watermarks accross all kafka partitions. = We also write late data to side output, but nothing is written there =E2=80= =93 if it would, it could explain missed data in the main output (I'm = also sure that our late data writing works, because we previously had = some actual late data which ended up there).

5. allowedLateness

It may be or may not be = relevant that I have also enabled allowedLateness with 1 minute lateness = on the 24-hour window:

If that makes sense, I could try removing allowedLateness = entirely? That would be just to rule out that Flink doesn't have a bug = that's related to restoring state in combination with the = allowedLateness feature. After all, all of our data should be in a good = enough order to not be late, given the max out of orderness used on = kafka consumer timestamp extractor.

Thank you in advance!







= --Apple-Mail=_EC93FADD-5F2B-4851-990E-3BC5ADE49E67--