From ZalaCheung <gzzhangdesh...@corp.netease.com>
Subject Re: Is that possible for flink to dynamically read and change configuration?
Date Mon, 24 Jul 2017 12:56:20 GMT
Hi Martin,

Thanks for your advice. That’s really helpful. I am using the push scenario. I am now having
some trouble because of the state I want to maintain. For me, the simplest way is to maintain
to ValueState in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function
can only be used on Keyed Stream. And for a connected stream, at least for my scenario, I
should not use KeyBy() method(Actually it seems not allowed to use KeyBy() function on connected
stream ).

Thus instead of using Rich function for Keyed Managed State, I tried to use CheckpointedFunction
for my non-keyed state. However, in CheckpointedFunction, I can only use ListState, which
only has add() and Iterator method. I am not sure whether I can just replace the element in
the ListState. What exactly make me stuck is that I cannot initialize my ListState with ListStateDescriptor.
It says there is no constructor for initialization value. I actually saw that on official

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>

    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                Tuple2.of(0L, 0L));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {

But in my code(Flink 1.3.1), it says there’s no constructor for three arguments(the third
argument in the example above is the default value). I am really confused.

How can I maintain my state for the CoFlatMap function?

 Desheng Zhang

> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com> wrote:
> Hey Desheng,
> Some options that come to mind:
> - Cave man style: Stop and restart job with new config.
> - Poll scenario: You could build your own thread that periodically loads from the db
into a per worker accessible cache.
> - Push scenario: have a config stream (based off of some queue) which you connect to
your data stream via the connect operator. In the CoFlatMapFunction that you have to provide
you basically update Flink state from the config flatMap and read the flink state from the
data flatMap and pass it along with the data. Then in the specific operator that uses the
config it can always get it from the data tuple that comes alongside the data, say in an invoke
method call of a sink. Example here <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
> Hope that gives u some ideas,
> M
> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesheng@corp.netease.com <mailto:gzzhangdesheng@corp.netease.com>>
> Hi all, 
> I am  now trying to implement a anomaly detection algorithm on Flink, which is actually
implement a Map operator to do anomaly detection based on timeseries.
> At first I want to read configuration(like which kafka source host to read datastream
from and which sink address to write data to ) from mongo db. It contains some system metric
 I want to monitor.
> What I did was read configuration from mongo DB and set as configuration of flink.
> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
> JSONObject jsonConfiguration = readConfiguration();
> conf.setInteger("period",jsonConfiguration.getInt("period"));
> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
> see.getConfig().setGlobalJobParameters(conf);
> The “readConfiguration()” method read the configuration from mongoDB.
> Just like the code I showed above. I set globalJobParameters to let all my workers share
these parameters including the metric I want to monitor.But maybe at some point I want to
change the metric I want to monitor. I think one possible way is to dynamically(or periodically)
read  configuration and reset the globalJobParameters to make the Flink program to change
the metric to monitor. Is  that possible?
> Thanks
> Desheng Zhang

