flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Is that possible for flink to dynamically read and change configuration?
Date Mon, 24 Jul 2017 15:20:25 GMT
So I don't know why it doesn't work (it should, afaik), but as a 
workaround you could maintain
an ArrayList or similar in your function, and only add/read elements 
from the ListState in snapshot/initialize state.

On 24.07.2017 17:10, ZalaCheung wrote:
> Hi all,
>
> Does anyone have idea about the non-keyed managed state problem below?
> I think all the function in the testFunc class should share the 
> ListState “metrics”. But after I add element to ListState at flatMap2 
> function, I cannot retrieve the element added to ListState.
>
>
> Desheng Zhang
>
>
>> On Jul 24, 2017, at 22:06, ZalaCheung 
>> <gzzhangdesheng@corp.netease.com 
>> <mailto:gzzhangdesheng@corp.netease.com>> wrote:
>>
>> Hi Chesnay,
>>
>> Thank you very much. Now I tried to ignore the default value of 
>> ListState and Try to use the CoFlatmap function with managed state. 
>> But what surprised me is that it seems the state was not shared by 
>> two streams.
>>
>> My test code is shown below.
>>
>> DataStream<String> result = stream
>>          .connect(control)
>>          .flatMap(new testFunc());
>>
>> public static class testFuncimplements CoFlatMapFunction<String,String,String>,CheckpointedFunction{
>>
>>      private ListState<String>metrics; @Override public void snapshotState(FunctionSnapshotContext
functionSnapshotContext)throws Exception {
>>
>>      }
>>
>>      @Override public void initializeState(FunctionInitializationContext functionInitializationContext)throws
Exception {
>>          ListStateDescriptor<String> metricsStateDescriptor =
>>                  new ListStateDescriptor<>(
>>                          "metrics", TypeInformation.of(new TypeHint<String>()
{})); metrics = functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor);
}
>>
>>      @Override public void flatMap1(String s, Collector<String> collector)throws
Exception {
>>          String myMetrics =null; for(String element:metrics.get()){
>>              logger.info("element in metric: " + s); myMetrics = element; }
>>          if(myMetrics ==null){
>>              logger.info("Not initialized"); }else {
>>              logger.info("initialized: " + myMetrics); }
>>
>>      }
>>
>>      @Override public void flatMap2(String s, Collector<String> collector)throws
Exception {
>>          metrics.clear(); metrics.add(s); for(String element:metrics.get()){
>>              logger.info("element in metric: " + element); }
>>
>>      }
>> }
>>
>> I connected two streams(stream and control) and use CoflatmapFunction 
>> on them. For control stream, I send a string and print the right log:
>> *- element in metric: heyjude*
>> Then I send another string to the first stream.
>> But the log prints:
>> *- Not initialized*
>>
>> I am confused. I successfully receive msg for stream control and add 
>> the string to ListState. But when I tried to retrieve ListState and 
>> flatMap1, I got nothing.
>>
>> Thanks.
>> Desheng Zhang
>>
>>
>>
>>> On Jul 24, 2017, at 21:01, Chesnay Schepler <chesnay@apache.org 
>>> <mailto:chesnay@apache.org>> wrote:
>>>
>>> Hello,
>>>
>>> That's an error in the documentation, only the ValueStateDescriptor 
>>> has a defaultValue constructor argument.
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 24.07.2017 14:56, ZalaCheung wrote:
>>>> Hi Martin,
>>>>
>>>> Thanks for your advice. That’s really helpful. I am using the push 
>>>> scenario. I am now having some trouble because of the state I want 
>>>> to maintain. For me, the simplest way is to maintain to ValueState 
>>>> in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the 
>>>> rich function can only be used on Keyed Stream. And for a connected 
>>>> stream, at least for my scenario, I should not use KeyBy() 
>>>> method(Actually it seems not allowed to use KeyBy() function on 
>>>> connected stream ).
>>>>
>>>> Thus instead of using Rich function for Keyed Managed State, I 
>>>> tried to use CheckpointedFunction for my non-keyed state. However, 
>>>> in CheckpointedFunction, I can only use ListState, which only has 
>>>> add() and Iterator method. I am not sure whether I can just replace 
>>>> the element in the ListState. What exactly make me stuck is that I 
>>>> cannot initialize my ListState with ListStateDescriptor. It says 
>>>> there is no constructor for initialization value. I actually saw 
>>>> that on official document.
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>>>>
>>>> |@Override public void initializeState(FunctionInitializationContext 
>>>> context) throws Exception { ListStateDescriptor<Tuple2<String, 
>>>> Integer>> descriptor = new ListStateDescriptor<>( 
>>>> "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<Long, 
>>>> Long>>() {}), Tuple2.of(0L, 0L)); checkpointedState = 
>>>> context.getOperatorStateStore().getListState(descriptor); if 
>>>> (context.isRestored()) { for (Tuple2<String, Integer> element : 
>>>> checkpointedState.get()) { bufferedElements.add(element); } } }|
>>>>
>>>>
>>>> But in my code(Flink 1.3.1), it says there’s no constructor for 
>>>> three arguments(the third argument in the example above is the 
>>>> default value). I am really confused.
>>>>
>>>> How can I maintain my state for the CoFlatMap function?
>>>>
>>>>
>>>> Thanks
>>>>  Desheng Zhang
>>>>
>>>>
>>>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com 
>>>>> <mailto:martineden131@gmail.com>> wrote:
>>>>>
>>>>> Hey Desheng,
>>>>>
>>>>> Some options that come to mind:
>>>>> - Cave man style: Stop and restart job with new config.
>>>>> - Poll scenario: You could build your own thread that periodically 
>>>>> loads from the db into a per worker accessible cache.
>>>>> - Push scenario: have a config stream (based off of some queue) 
>>>>> which you connect to your data stream via the connect operator. In 
>>>>> the CoFlatMapFunction that you have to provide you basically 
>>>>> update Flink state from the config flatMap and read the flink 
>>>>> state from the data flatMap and pass it along with the data. Then 
>>>>> in the specific operator that uses the config it can always get it 
>>>>> from the data tuple that comes alongside the data, say in an 
>>>>> invoke method call of a sink. Example here 
>>>>> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>>>>
>>>>> Hope that gives u some ideas,
>>>>> M
>>>>>
>>>>>
>>>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung 
>>>>> <gzzhangdesheng@corp.netease.com 
>>>>> <mailto:gzzhangdesheng@corp.netease.com>> wrote:
>>>>>
>>>>>     Hi all,
>>>>>
>>>>>     I am  now trying to implement a anomaly detection algorithm on
>>>>>     Flink, which is actually implement a Map operator to do
>>>>>     anomaly detection based on timeseries.
>>>>>     At first I want to read configuration(like which kafka source
>>>>>     host to read datastream from and which sink address to write
>>>>>     data to ) from mongo db. It contains some system metric  I
>>>>>     want to monitor.
>>>>>
>>>>>     What I did was read configuration from mongo DB and set as
>>>>>     configuration of flink.
>>>>>
>>>>>     StreamExecutionEnvironment  see =StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf =new Configuration();
>>>>>
>>>>>     JSONObject jsonConfiguration =readConfiguration();
>>>>>
>>>>>     conf.setInteger("period",jsonConfiguration.getInt("period"));
>>>>>     conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>>>>     conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>>>>
>>>>>     see.getConfig().setGlobalJobParameters(conf);
>>>>>
>>>>>     The “readConfiguration()” method read the configuration from
>>>>>     mongoDB.
>>>>>
>>>>>     Just like the code I showed above. I set globalJobParameters
>>>>>     to let all my workers share these parameters including the
>>>>>     metric I want to monitor.But maybe at some point I want to
>>>>>     change the metric I want to monitor. I think one possible way
>>>>>     is to dynamically(or periodically) read  configuration and
>>>>>     reset the globalJobParameters to make the Flink program to
>>>>>     change the metric to monitor. Is  that possible?
>>>>>
>>>>>     Thanks
>>>>>     Desheng Zhang
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Mime
View raw message