Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F0D72200D11 for ; Mon, 2 Oct 2017 12:08:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EBEF71609EF; Mon, 2 Oct 2017 10:08:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3E4231609DE for ; Mon, 2 Oct 2017 12:08:05 +0200 (CEST) Received: (qmail 49036 invoked by uid 500); 2 Oct 2017 10:07:59 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 49027 invoked by uid 99); 2 Oct 2017 10:07:59 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 10:07:59 +0000 Received: from Timos-MBP.fritz.box (dslb-094-222-124-158.094.222.pools.vodafone-ip.de [94.222.124.158]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 645591A0055 for ; Mon, 2 Oct 2017 10:07:57 +0000 (UTC) Subject: Re: Enriching data from external source with cache To: user@flink.apache.org References: <12d0f7c0-ab1b-f2de-c903-dc53bbe2a3a1@gmail.com> From: Timo Walther Message-ID: Date: Mon, 2 Oct 2017 12:07:56 +0200 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:52.0) Gecko/20100101 Thunderbird/52.3.0 MIME-Version: 1.0 In-Reply-To: <12d0f7c0-ab1b-f2de-c903-dc53bbe2a3a1@gmail.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit archived-at: Mon, 02 Oct 2017 10:08:06 -0000 Hi Derek, maybe the following talk can inspire you, how to do this with joins and async IO: https://www.youtube.com/watch?v=Do7C4UJyWCM (around the 17th min). Basically, you split the stream and wait for an Async IO result in a downstream operator. But I think having a transient guava cache is not a bad idea, since it is only a cache it does not need to be checkpointed and can be recovered at any time. Implementing you own logic in a ProcessFunction is always a way, but might require more implementation effort. Btw. if you feel brave enough, you could also think of contributing a stateful async IO. It should not be too much effort to make this work. Regards, Timo Am 9/29/17 um 8:39 PM schrieb Derek VerLee: > My basic problem will sound familiar I think, I need to enrich > incoming data using a REST call to an external system for slowly > evolving metadata. and some cache based lag is acceptable, so to > reduce load on the external system and to process more efficiently, I > would like to implement a cache.  The cache would by key, and I am > already doing a keyBy for the same key in the job. > > Please correct me if I'm wrong: > * Keyed State would be great to store my metadata "cache", Async I/O > is ideal for pulling from the external system, > but AsyncFunction can not access keyed state ( "Exception: State is > not supported in rich async functions.") and operators can not share > state between them. > > This leaves me wondering, since side inputs are not here yet, what the > best (and perhaps most idiomatic) way to approach my problem? > > I'd rather keep changes to existing systems minimal for this iteration > and just minimize impact on them during peaks best I can... systemic > refactoring and re-architecture will be coming soon (so I'm happy to > hear thoughts on that as well). > > Approaches considered: > > 1. AsyncFunction with a transient guava cache.  Not ideal ... but > maybe good enough to get by > 2. Using compound message types (oh, if only java had real algebraic > data types...) and send cache miss messages from some > CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not keyed) > which then backfeeds cache updates to the former via iteration ... i > don't know why this couldn't work but it feels like a hot mess unless > there is some way I am not thinking of to do it cleanly > 3. One user mentioned on a similar thread loading the data in as > another DataStream and then using joins, but I'm confused about how > this would work, it seems to me that joins happen on windows, windows > pertain to (some notion of) time, what would be my notion of time for > the slow (maybe years old in some cases) meta-data? > 4. Forget about async I/O > 5. implement my own "async i/o" in using a process function or > similar  .. is this a valid pattern