Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6F72718A8B for ; Thu, 25 Feb 2016 18:00:29 +0000 (UTC) Received: (qmail 44473 invoked by uid 500); 25 Feb 2016 18:00:19 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 44386 invoked by uid 500); 25 Feb 2016 18:00:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 44376 invoked by uid 99); 25 Feb 2016 18:00:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 18:00:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E4A01C6AA9 for ; Thu, 25 Feb 2016 18:00:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.998 X-Spam-Level: * X-Spam-Status: No, score=1.998 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id BjidLWU6JdrK for ; Thu, 25 Feb 2016 18:00:17 +0000 (UTC) Received: from mail-ig0-f176.google.com (mail-ig0-f176.google.com [209.85.213.176]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 80C515FBCD for ; Thu, 25 Feb 2016 18:00:17 +0000 (UTC) Received: by mail-ig0-f176.google.com with SMTP id xg9so18484334igb.1 for ; Thu, 25 Feb 2016 10:00:17 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to; bh=QaB3KM1TK4JO87jwawemP74aZZU9gp7gF6D+IaL2zsI=; b=XAB95PodxMn9txXFzP3uJEA6wNbW/dyWtDfWCTdFJo0PyMOFdLh9c5bkiTBFb/BpYp gCgNewcyb5EI80nAnDVKiKiKJqBrYgKgI7ubaHWxrws74bqwYgl3FRT66JGxSEMmuEik fXBRjkxhWza4opvQABPDWD4q3uUOZeUPc8ABpRCNX44J8GJki4UKG2Xm3sPpY5kFMd9w tS6GVXJkw0Lr5mOFomafggoSqlIpKWWHcd8KPDHq6GLmKd+Ul3oadqtEnwCHrfdIzyno P0/bTS4sfinN2Ew0fOLApzikd0JW29XrqZePe5TGz+oxxyBPE2DJm5JbwdVy8rc3JPKo KbyA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:sender:in-reply-to:references:date :message-id:subject:from:to; bh=QaB3KM1TK4JO87jwawemP74aZZU9gp7gF6D+IaL2zsI=; b=Pjh7JNHLD073/VAxBTpQaUQywqe8wOwyCaAZ7lKweOf5AG+5SXYHm2znpIun52rgDV 5oKdaIEKfbQdzZr5uwdrsV7sZs+kuJ+3t9o3sSmv4gdvxN+f4kTopXYr5eEvsnXhSAgf nQ5L0WYhyycCw6Gn5enVmeh5bLb8DnGvtp/bZAHMuD0xuCGDll8pWchFB9FcTwGKIGUI CbX0s7g07AY0f/mm0vUPphcFyApZX5KIcYFJqPCJ4MBACN9zlxMcIUOxgW+F1QSPi9II a/9GekBcObOTDoepZsKbNzxrqEKSBA0MXKXH/DxRm/KsO1/Ya6K0c0O0dClKS4JxSJx/ RejA== X-Gm-Message-State: AG10YOT/mnP2DuKzubtpHgOqGkn2FowE/0sfJjr8/xybYjl/X6kuipzFIo2azuMTVD6MfwROmXKyAEUBOXFMDg== MIME-Version: 1.0 X-Received: by 10.50.142.72 with SMTP id ru8mr4129480igb.71.1456423209987; Thu, 25 Feb 2016 10:00:09 -0800 (PST) Sender: ewenstephan@gmail.com Received: by 10.107.159.194 with HTTP; Thu, 25 Feb 2016 10:00:09 -0800 (PST) In-Reply-To: References: Date: Thu, 25 Feb 2016 19:00:09 +0100 X-Google-Sender-Auth: -_D_LUZsrNAS9WoWmFDc5mEJw5E Message-ID: Subject: Re: schedule tasks `inside` Flink From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c3d9ae12f602052c9bf4bf --001a11c3d9ae12f602052c9bf4bf Content-Type: text/plain; charset=UTF-8 Fabian's suggestion with the co-map is good. You can use a "broadcast()" connect to make sure the dictionary gets to all nodes. If you want full control about how and when to read the data, a scheduled task is not that bad even as a solution. Make sure you implement this as a "RichFunction", so you can use "open()" to read the first set of data and "close()" to stop your threads. As a related issue: We are looking into extensions to the API to explicitly support such "slow changing inputs" in a similar way as "broadcast variables" work in the DataSet API. This is the JIRA issue, if you post your use case there, you can make this part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514 Greetings, Stephan On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske wrote: > Hi Michal, > > If I got your requirements right, you could try to solve this issue by > serving the updates through a regular DataStream. > You could add a SourceFunction which periodically emits a new version of > the cache and a CoFlatMap operator which receives on the first input the > regular streamed input and on the second input the cache updates. If the > Flink job gets stopped, the update source will be canceled as a regular > source. > > You might also want to expose the cache as operator state to Flink to > ensure it is checkpointed and restored in case of a failure. > > Best, Fabian > > 2016-02-14 18:36 GMT+01:00 Michal Fijolek : > >> Hello. >> My app needs Map[K, V] as simple cache for business data, which needs to >> be invalidated periodically, lets say once per day. >> Right now I'm using rather naive approach which is >> >> trait Dictionary[K, V] extends Serializable { >> @volatile private var cache: Map[K, V] = Map() >> def lookup(id: K): Option[V] = cache.get(id) >> private def fetchDictionary: Map[K, V] = ??? >> private def updateDictionary() = { >> cache = fetchDictionary >> } >> val invalidate = new Runnable with Serializable { >> override def run(): Unit = updateDictionary() >> } >> Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, oncePerDay) >> } >> >> This seems wrong, because I guess I should do such thing `inside` Flink, and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. >> What will be idomatic Flink way to approach this problem? How can I schedule tasks and make Flink aware of them? >> >> Thanks, >> Michal >> >> > --001a11c3d9ae12f602052c9bf4bf Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Fabian's suggestion with the co-map is good. You can u= se a "broadcast()" connect to make sure the dictionary gets to al= l nodes.

If you want full control about how and when to = read =C2=A0the data, a scheduled task is not that bad even as a solution. M= ake sure you implement this as a "RichFunction", so you can use &= quot;open()" to read the first set of data and "close()" to = stop your threads.

As a related issue: We are look= ing into extensions to the API to explicitly support such "slow changi= ng inputs" in a similar way as "broadcast variables" work in= the DataSet API.=C2=A0
This is the JIRA issue, if you post your = use case there, you can make this part of the discussion: https://issues.apache.org/jira/= browse/FLINK-3514

Greetings,
Stephan=





=

On Mo= n, Feb 15, 2016 at 12:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi M= ichal,

If I got your requirements right, you could try to solv= e this issue by serving the updates through a regular DataStream.
You co= uld add a SourceFunction which periodically emits a new version of the cach= e and a CoFlatMap operator which receives on the first input the regular st= reamed input and on the second input the cache updates. If the Flink job ge= ts stopped, the update source will be canceled as a regular source.

= You might also want to expose the cache as operator state to Flink to ensur= e it is checkpointed and restored in case of a failure.

<= /div>Best, Fabian

2016-02-14 18:36 GMT+01:0= 0 Michal Fijolek <michalfijolek91@gmail.com>:
Hello.
My app needs Map[K,= V] as simple cache for business data, which needs to be invalidated period= ically, lets say once per day.=C2=A0
Right now I'm using rath= er naive approach which is=C2=A0
trai=
t Dictionary[K, V] extends Serializable {
@volatile private var cache: Map[K, V] =3D Map()
def lookup(i= d: K): Option[V] =3D cache.get(id)
private def fetchDictionary: Map[K, V] =3D ???
private def updateDictionary() =3D {
cache =3D fetchDictionary
}
= val invalidate =3D new
Runnable with
Serializable = {
override de= f run(): Unit =3D upda= teDictionary()
}
Executors.newSingleThreadScheduledExecutor().sch= eduleAtFixedRate(invalidate, oncePerDay)
}
This seems wrong, because I guess I should=
 do such thing `inside` Flink, and when I stop Flink job, nobody's gonn=
a stop scheduled invalidation tasks. 
What will be idomatic Flink way to= approach this problem? How can I schedule tasks and make Flink aware of th= em?

Thanks,
Michal


--001a11c3d9ae12f602052c9bf4bf--