From user-return-22754-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Sep 10 15:41:01 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8C4DA180656 for ; Mon, 10 Sep 2018 15:41:00 +0200 (CEST) Received: (qmail 26179 invoked by uid 500); 10 Sep 2018 13:40:59 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 26168 invoked by uid 99); 10 Sep 2018 13:40:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2018 13:40:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D2E4C1A1C4D for ; Mon, 10 Sep 2018 13:40:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.011 X-Spam-Level: X-Spam-Status: No, score=-0.011 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 6bv3x1kIzXEi for ; Mon, 10 Sep 2018 13:40:57 +0000 (UTC) Received: from mail-wr1-f53.google.com (mail-wr1-f53.google.com [209.85.221.53]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id CC2B45F118 for ; Mon, 10 Sep 2018 13:40:56 +0000 (UTC) Received: by mail-wr1-f53.google.com with SMTP id u12-v6so21945845wrr.4 for ; Mon, 10 Sep 2018 06:40:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=+Yf+94jriu9YydO2vz8ppw2Dq7xbQOTYVUCkOB+73lk=; b=rUpIMAtDt/da/0qxqcKG5G5VS22t/KTDk0y5jDs6/ODBU4bnfJfrMC1N0QqnuTkO0z 0USqxEzm/xj88d+mr4qZYuADyoGVxEnIwmJLfd1V3Gs+hbpTLv/1vw/otLZ9vnwY75hu l+b3ajNAZVRben/ni2YybKn1amqw++xI11XefEt0BVfIbg1day5469jNXBCW16Ehs/9L DZZMyS9GlbSwbgjWU2TK8kPkKa9VOMIs3ke9b+lQLO4Ti6Q55BtwYhyewCTilo21Z/Vd SlKjDMltA0lzpzbVmMJYDx4+AP88VfbmTad0M2fAn/Bi8iJuAonIEnbl4qKweDkNnJhD r+Tg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=+Yf+94jriu9YydO2vz8ppw2Dq7xbQOTYVUCkOB+73lk=; b=pyozksHuQorvUACvLMz89KhIOgIAhBrd87x7M8sPHIIPe513D52iiIkmMbjRDBSG47 0UzEKR7GoQDkwQNwUor1s9Hl7FZkUW12mJJfRDL4NvzvsKhuHt/UVKPf9Z1YKsb1L39y eCiEz9DdepIPvoA0XC7D5DneBvlGhgdnqTp18E54ndyNVfpvfJMfj4B7nxekE5YRQiwG Ez3ODxCuMY9hxq2NtNgaPcKOwtK9ft+l3FuraZWU6BHqHEN4X41cqfcUV6Wx2NTUamPn EHIYplHCbtIuDhgWJyH0kQhQpfLfkGo2pe0ci/Pnmt9+g2YLdrCOsHZq1Smax0Pzh6o4 2faw== X-Gm-Message-State: APzg51BQG4ZiJyPqOItX3VOluSwJrMGdZdIYdtyy3nmBLfsHcdvMJvTj eqg0YPLIz8kXFsH8bR9heJeaww== X-Google-Smtp-Source: ANB0VdZp18IHTEcaM1KV89Gci1O6r/jJ1tJcqa7h3PxPtmcd6Z7K1rHxARhhM+jJ2BqNJVOSPb0dzA== X-Received: by 2002:a5d:470d:: with SMTP id y13-v6mr15722569wrq.229.1536586849674; Mon, 10 Sep 2018 06:40:49 -0700 (PDT) Received: from andreys-mbp.office.data-artisans.net (dslb-002-205-086-134.002.205.pools.vodafone-ip.de. [2.205.86.134]) by smtp.gmail.com with ESMTPSA id p89-v6sm26208936wrc.97.2018.09.10.06.40.48 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 10 Sep 2018 06:40:49 -0700 (PDT) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 11.5 \(3445.9.1\)) Subject: Re: Aggregator State in Keyed Windowed Stream From: Andrey Zagrebin In-Reply-To: <22DB7BD9-D7A4-4497-B9E0-85485CF80AE2@gmail.com> Date: Mon, 10 Sep 2018 15:40:47 +0200 Cc: vino yang , user Content-Transfer-Encoding: quoted-printable Message-Id: References: <22DB7BD9-D7A4-4497-B9E0-85485CF80AE2@gmail.com> To: Ning Shi X-Mailer: Apple Mail (2.3445.9.1) Hi Ning, > Back to my first question, is the accumulator state backed by RocksDB = state backend? If so, I don=E2=80=99t need to use rich function for the = aggregate function. the answer is yes, it is backed by state backend (should be RocksDB if = you configure it),=20 you can trace it through these method calls: sourceStream.keyBy(=E2=80=A6) .timeWindow(Time.seconds(=E2=80=A6)) .trigger(CountTrigger.of(=E2=80=A6)) gives you WindowedStream, WindowedStream.aggregate(new MyAggFunc()) creates: new WindowOperator(windowStateDescriptor =3D new = AggregatingStateDescriptor()),=20 inside WindowOperator:=20 WindowOperator.open() uses configured backend to create windowState, WindowOperator.processElement() uses windowState which is = AggregatingState. Cheers, Andrey > On 10 Sep 2018, at 13:39, Ning Shi wrote: >=20 > Hi Vino, >=20 >> If you need access to the state API, you can consider using = ProcessWindowFunction[1], which allows you to use ProcessWindowFunction. >=20 > I was hoping that I could use the aggregate function to do incremental = aggregation. My understanding is that ProcessWindowFunction either has = to loop through all records or be combined with an aggregate function to = do incremental aggregation. >=20 > Back to my first question, is the accumulator state backed by RocksDB = state backend? If so, I don=E2=80=99t need to use rich function for the = aggregate function. >=20 > Thanks, >=20 > Ning