flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Fijolek <michalfijole...@gmail.com>
Subject Re: schedule tasks `inside` Flink
Date Thu, 25 Feb 2016 20:17:59 GMT
Thanks for help guys!
Eventually I did implemented it as a RichFunction using open() and closed()


2016-02-25 19:00 GMT+01:00 Stephan Ewen <sewen@apache.org>:

> Fabian's suggestion with the co-map is good. You can use a "broadcast()"
> connect to make sure the dictionary gets to all nodes.
> If you want full control about how and when to read  the data, a scheduled
> task is not that bad even as a solution. Make sure you implement this as a
> "RichFunction", so you can use "open()" to read the first set of data and
> "close()" to stop your threads.
> As a related issue: We are looking into extensions to the API to
> explicitly support such "slow changing inputs" in a similar way as
> "broadcast variables" work in the DataSet API.
> This is the JIRA issue, if you post your use case there, you can make this
> part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514
> Greetings,
> Stephan
> On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>> Hi Michal,
>> If I got your requirements right, you could try to solve this issue by
>> serving the updates through a regular DataStream.
>> You could add a SourceFunction which periodically emits a new version of
>> the cache and a CoFlatMap operator which receives on the first input the
>> regular streamed input and on the second input the cache updates. If the
>> Flink job gets stopped, the update source will be canceled as a regular
>> source.
>> You might also want to expose the cache as operator state to Flink to
>> ensure it is checkpointed and restored in case of a failure.
>> Best, Fabian
>> 2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijolek91@gmail.com>:
>>> Hello.
>>> My app needs Map[K, V] as simple cache for business data, which needs to
>>> be invalidated periodically, lets say once per day.
>>> Right now I'm using rather naive approach which is
>>> trait Dictionary[K, V] extends Serializable {
>>>   @volatile private var cache: Map[K, V] = Map()
>>>   def lookup(id: K): Option[V] = cache.get(id)
>>>   private def fetchDictionary: Map[K, V] = ???
>>>   private def updateDictionary() = {
>>>     cache = fetchDictionary
>>>   }
>>>   val invalidate = new Runnable with Serializable {
>>>     override def run(): Unit = updateDictionary()
>>>   }
>>>   Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate,
>>> }
>>> This seems wrong, because I guess I should do such thing `inside` Flink, and
when I stop Flink job, nobody's gonna stop scheduled invalidation tasks.
>>> What will be idomatic Flink way to approach this problem? How can I schedule
tasks and make Flink aware of them?
>>> Thanks,
>>> Michal

View raw message