flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Eden <martineden...@gmail.com>
Subject Re: Is that possible for flink to dynamically read and change configuration?
Date Mon, 24 Jul 2017 11:44:52 GMT
Hey Desheng,

Some options that come to mind:
- Cave man style: Stop and restart job with new config.
- Poll scenario: You could build your own thread that periodically loads
from the db into a per worker accessible cache.
- Push scenario: have a config stream (based off of some queue) which you
connect to your data stream via the connect operator. In the
CoFlatMapFunction that you have to provide you basically update Flink state
from the config flatMap and read the flink state from the data flatMap and
pass it along with the data. Then in the specific operator that uses the
config it can always get it from the data tuple that comes alongside the
data, say in an invoke method call of a sink. Example here
<https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>
.

Hope that gives u some ideas,
M


On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <
gzzhangdesheng@corp.netease.com> wrote:

> Hi all,
>
> I am  now trying to implement a anomaly detection algorithm on Flink,
> which is actually implement a Map operator to do anomaly detection based on
> timeseries.
> At first I want to read configuration(like which kafka source host to read
> datastream from and which sink address to write data to ) from mongo db. It
> contains some system metric  I want to monitor.
>
> What I did was read configuration from mongo DB and set as configuration
> of flink.
>
> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
>
> JSONObject jsonConfiguration = readConfiguration();
>
> conf.setInteger("period",jsonConfiguration.getInt("period"));
> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>
> see.getConfig().setGlobalJobParameters(conf);
>
> The “readConfiguration()” method read the configuration from mongoDB.
>
> Just like the code I showed above. I set globalJobParameters to let all my
> workers share these parameters including the metric I want to monitor.But
> maybe at some point I want to change the metric I want to monitor. I think
> one possible way is to dynamically(or periodically) read  configuration and
> reset the globalJobParameters to make the Flink program to change the
> metric to monitor. Is  that possible?
>
> Thanks
> Desheng Zhang
>
>
>

Mime
View raw message