flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: schedule tasks `inside` Flink
Date Thu, 25 Feb 2016 18:00:09 GMT
Fabian's suggestion with the co-map is good. You can use a "broadcast()"
connect to make sure the dictionary gets to all nodes.

If you want full control about how and when to read  the data, a scheduled
task is not that bad even as a solution. Make sure you implement this as a
"RichFunction", so you can use "open()" to read the first set of data and
"close()" to stop your threads.

As a related issue: We are looking into extensions to the API to explicitly
support such "slow changing inputs" in a similar way as "broadcast
variables" work in the DataSet API.
This is the JIRA issue, if you post your use case there, you can make this
part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514


On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Michal,
> If I got your requirements right, you could try to solve this issue by
> serving the updates through a regular DataStream.
> You could add a SourceFunction which periodically emits a new version of
> the cache and a CoFlatMap operator which receives on the first input the
> regular streamed input and on the second input the cache updates. If the
> Flink job gets stopped, the update source will be canceled as a regular
> source.
> You might also want to expose the cache as operator state to Flink to
> ensure it is checkpointed and restored in case of a failure.
> Best, Fabian
> 2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijolek91@gmail.com>:
>> Hello.
>> My app needs Map[K, V] as simple cache for business data, which needs to
>> be invalidated periodically, lets say once per day.
>> Right now I'm using rather naive approach which is
>> trait Dictionary[K, V] extends Serializable {
>>   @volatile private var cache: Map[K, V] = Map()
>>   def lookup(id: K): Option[V] = cache.get(id)
>>   private def fetchDictionary: Map[K, V] = ???
>>   private def updateDictionary() = {
>>     cache = fetchDictionary
>>   }
>>   val invalidate = new Runnable with Serializable {
>>     override def run(): Unit = updateDictionary()
>>   }
>>   Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, oncePerDay)
>> }
>> This seems wrong, because I guess I should do such thing `inside` Flink, and when
I stop Flink job, nobody's gonna stop scheduled invalidation tasks.
>> What will be idomatic Flink way to approach this problem? How can I schedule tasks
and make Flink aware of them?
>> Thanks,
>> Michal

View raw message