From user-return-20485-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Jun 11 01:07:39 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B980718062B for ; Mon, 11 Jun 2018 01:07:38 +0200 (CEST) Received: (qmail 54801 invoked by uid 500); 10 Jun 2018 23:07:37 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 54791 invoked by uid 99); 10 Jun 2018 23:07:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Jun 2018 23:07:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B4CEDCC56A for ; Sun, 10 Jun 2018 23:07:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.869 X-Spam-Level: * X-Spam-Status: No, score=1.869 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 389_sPjTS9KS for ; Sun, 10 Jun 2018 23:07:35 +0000 (UTC) Received: from mail-pl0-f41.google.com (mail-pl0-f41.google.com [209.85.160.41]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 46FB55F1E6 for ; Sun, 10 Jun 2018 23:07:35 +0000 (UTC) Received: by mail-pl0-f41.google.com with SMTP id a7-v6so8752815plp.3 for ; Sun, 10 Jun 2018 16:07:35 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=HVSt70aFDb6gNpd5PBbsr8+rejygBi7WlHhgr8w+n84=; b=nz12Rdk/dRjrPo12WiD88Br+sV5YuCJQpjDRKE5EiN+sqn7VY4UFuzEpadC6TZpMHu qeK3hjWF0VM0FjTeEI0nouErIHjTTiJfA+A9PKowhCPqv9yOrP2Br+j+tU6owj2n10T4 HwJ0HXo/vKyC9mdXb89QKbSjBiaQYjxUSKSHsZsrh8jLeFkSxLktFtCJoUGgacagrpoz XIxQe8gjlnjegihJ8Gl5WSBS2fSynHYd0XSrA/DEx0gRfToQLUn2+orL3piUwx+npWON dK0oxYaEQ3cVw+iYGcwFe5na5JDw4+ORkjtAqWULHr46bQQIiznE1reOnxlNOHS55DzO EC+w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=HVSt70aFDb6gNpd5PBbsr8+rejygBi7WlHhgr8w+n84=; b=RA9OHuoHUyvEe7L55iqqlOScETNXEssjff1B8+9pLkVV95z91B3DjXFiYApv4d4J6D KLCTl5uTSqPS5pktn5XhHfCnFSSDfTOIsuGJPMbXxU+48YmJBMioNOlynJ2XUbgd41Mf pXGfA6crYYxyi10qLV0YLdY50UWj5uAou/UKtwF1s54I1ZSRiTMVZEgix0iiguurgtrF 0oOhovOiXz9Ai/Mxu67iYfVNJYqA4nLfdYEsh6W4EBJD1t89Da2w7wEvrrUm8m6CX1Yb LDkHFQFuJBGv4+mer5LUfFQhKOJIyy3Sa+hKJ+xAXWTAe8bg6VGmaKDfit7eBwNetbsP FEgw== X-Gm-Message-State: APt69E28/1h1s4S0o+rkhTrm+KFggx4S+UUj4CiwXqI0O4DKIHfuIgjf lhVrPwTv2EGw6dfVj08dp5qmBLTUrx5AF1ZmvDngr2Ue X-Google-Smtp-Source: ADUXVKL5KujsO7314a2qIvSS8T1y4Rq0sYQfTCPLj2nn67CuSbpJIlZJ2nd7enhb7ewv+xjdsD9s6/1blWwFO6o71ag= X-Received: by 2002:a17:902:bc4a:: with SMTP id t10-v6mr15830587plz.133.1528672047858; Sun, 10 Jun 2018 16:07:27 -0700 (PDT) MIME-Version: 1.0 Received: by 2002:a17:90a:a82:0:0:0:0 with HTTP; Sun, 10 Jun 2018 16:07:27 -0700 (PDT) From: Steven Wu Date: Sun, 10 Jun 2018 16:07:27 -0700 Message-ID: Subject: how to emit record to downstream operator in snapshotState and/or onProcessingTime To: user Content-Type: multipart/alternative; boundary="0000000000006409e1056e51b2bf" --0000000000006409e1056e51b2bf Content-Type: text/plain; charset="UTF-8" I have a process function defined with these interfaces public class MyProcessFunction extends ProcessFunction implements CheckpointedFunction, ProcessingTimeCallback {...} In snapshotState() method, I want to close files and emit the metadata about the closed files to downstream operator. it doesn't seem possible with *snapshotState(FunctionSnapshotContext context*) interface. I can keep metadata in snapshot and restore them during recovery. but if there is no input record coming for a long time, * processElement(T value, Context ctx, Collector out)* won't be called. Then I can't forward the restored data to downstream operator with guaranteed latency. I can add a timer. but it doesn't seem that *onProcessingTime(long timestamp)* allows me to forward output to downstream operator either. Thanks, Steven --0000000000006409e1056e51b2bf Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I have a process function defined with these interfac= es

public class MyProcessFunction<IN> e= xtends ProcessFunction<IN, OUT>=C2=A0
=C2=A0 =C2=A0 impleme= nts CheckpointedFunction, ProcessingTimeCallback {...}

=
In snapshotState() method, I want to close files and emit the = metadata about the closed files to downstream operator. it doesn't seem= possible with=C2=A0snapshotState(FunctionSnapshotContext context) i= nterface.

I can keep metadat= a in snapshot and restore them during recovery. but if there is no input re= cord coming for a long time,=C2=A0=C2=A0processElement(T value, Context = ctx, Collector<DataFile> out) won't be called. Then I can'= ;t forward the restored data to downstream operator with guaranteed latency= .

I can add a timer. but it doesn't seem tha= t onProcessingTime(long timestamp) allows me to forward output to do= wnstream operator either.

Thanks,
Steven
--0000000000006409e1056e51b2bf--