From user-return-35992-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jun 23 13:52:50 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 55DC01804BB for ; Tue, 23 Jun 2020 15:52:50 +0200 (CEST) Received: (qmail 77267 invoked by uid 500); 23 Jun 2020 13:52:48 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 77256 invoked by uid 99); 23 Jun 2020 13:52:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jun 2020 13:52:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D094B181448 for ; Tue, 23 Jun 2020 13:52:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.532 X-Spam-Level: *** X-Spam-Status: No, score=3.532 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, FORGED_GMAIL_RCVD=1, KAM_DMARC_NONE=0.25, KAM_DMARC_STATUS=0.01, NML_ADSP_CUSTOM_MED=1.2, SPF_HELO_PASS=-0.001, SPF_SOFTFAIL=0.972, URI_HEX=0.1] autolearn=disabled Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id YSWbkSVEyeKc for ; Tue, 23 Jun 2020 13:52:46 +0000 (UTC) Received-SPF: Softfail (mailfrom) identity=mailfrom; client-ip=199.38.86.66; helo=n4.nabble.com; envelope-from=krzysiek.chmielewski@gmail.com; receiver= Received: from n4.nabble.com (n4.nabble.com [199.38.86.66]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTP id C351FBB8C5 for ; Tue, 23 Jun 2020 13:52:45 +0000 (UTC) Received: from n4.nabble.com (localhost [127.0.0.1]) by n4.nabble.com (Postfix) with ESMTP id EA5BF1506A4AA for ; Tue, 23 Jun 2020 08:52:43 -0500 (CDT) Date: Tue, 23 Jun 2020 08:52:43 -0500 (CDT) From: KristoffSC To: user@flink.apache.org Message-ID: <1592920363956-0.post@n4.nabble.com> Subject: Session Window with Custom Trigger MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit Hi all, I'm using Flink 1.9.2 and I would like to ask about my use case and approach I've took to meet it. The use case: I have a keyed stream, where I have to buffer messages with logic: 1. Buffering should start only when message arrives. 2. The max buffer time should not be longer than 3 seconds 3. Each new message should NOT prolong the buffer time. 4. If particular business condition will be meet, buffering should stop and all messages should be let through further processing. The business logic in point 4 is taking under the consideration data from previously buffered messages in this time buffer session. My setup for this is 1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for this). 2. Custom Trigger The custom trigger: 1. keeps some data in its state under AggregatingStateDescriptor allowing me to override "merge" method from Trigger class. 2. In onElement method, for the first call I execute ctx.registerEventTimeTimer(window.maxTimestamp()); Additionally in this method I added the busioenss logic which returns TriggerResult.FIRE or TriggerResult.CONTINUE 3. The onProcessingTime methods returns TriggerResult.FIRE 3. all other methods are returning TriggerResult.CONTINUE As a result, I can observe that my window is fired two times. One from onElement method where the busienss condition is meet and second time from onProcessingTime method. What is the best way to prevent this? Regards, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/