flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Kreutzfeldt <mnx...@gmail.com>
Subject Re: Accessing configuration in RichFunction
Date Mon, 18 Jan 2016 13:55:53 GMT
Hi Robert,

using the constructor is actually the selected way. Using the existing
lifecycle method was an idea to integrate it more with the existing
framework design ;-)

Best
  Christian

2016-01-18 13:38 GMT+01:00 Robert Metzger <rmetzger@apache.org>:

> Hi Christian,
>
> I think the DataStream API does not allow you to pass any parameter to the
> open(Configuration) method.
> That method is only used in the DataSet (Batch) API, and its use is
> discouraged.
>
> A much better option to pass a Configuration into your function is as
> follows:
>
>
> Configuration mapConf = new Configuration();
> mapConf.setDouble("somthing", 1.2);
>
> DataStream<Tuple2<String, Integer>> counts =
> // split up the lines in pairs (2-tuples) containing: (word,1)
>
> text.flatMap(new Tokenizer(mapConf))
> // group by the tuple field "0" and sum up tuple field "1"
>       .keyBy(0).sum(1);
>
>
> And in the Tokenizer:
>
> public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,
Integer>> {
>    private final Configuration mapConf;
>
>    public Tokenizer(Configuration mapConf) {
>       this.mapConf = mapConf;
>    }
>
>
> This works as long as the type you're passing is serializable.
>
>
>
> On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <mnxfst@gmail.com>
> wrote:
>
>> Hi Max,
>>
>> maybe I explained it a bit mistakable ;-)
>>
>> I have a stream-based application which contains a RichFilterFunction
>> implementation. The parent provides a lifecycle method open
>> (open(Configuration)) which receives a Configuration object as input. I
>> would like to use this call to pass options into the operator instance.
>>
>> Unfortunately, I found no hint where and how to provide the information
>> such that I receive them at the described method. Actually, I am accessing
>> the surrounding runtime context to retrieve the global job parameters where
>> I extract the desired information from. But for some reasons I do not want
>> the operator to receive its setup information from the provided
>> Configuration instance ;-)
>>
>> That's why I am looking for the place where the configuration object is
>> created and passed into the rich filter function. I would like to insert
>> dedicated information for a dedicated filter instance.
>>
>> Best
>>   Christian
>>
>>
>> 2016-01-18 12:30 GMT+01:00 Maximilian Michels <mxm@apache.org>:
>>
>>> Hi Christian,
>>>
>>> For your implementation, would it suffice to pass a Configuration with
>>> your RichFilterFunction? You said the global job parameters are not
>>> passed on to your user function? Can you confirm this is a bug?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
>>> <mnxfst@gmail.com> wrote:
>>> > Hi Fabian,
>>> >
>>> > thanks for your quick response. I just figured out that I forgot to
>>> mention
>>> > a small but probably relevant detail: I am working with the streaming
>>> api.
>>> >
>>> > Although there is a way to access the overall job settings, I need a
>>> > solution to "reduce" the view on configuration options available on
>>> operator
>>> > level.
>>> > For example, I would like to pass instance specific settings like an
>>> > operator identifier but there might be different operators in the
>>> overall
>>> > program.
>>> >
>>> > Best
>>> >   Christian
>>> >
>>> > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
>>> >>
>>> >> Hi Christian,
>>> >>
>>> >> the open method is called by the Flink workers when the parallel
>>> tasks are
>>> >> initialized.
>>> >> The configuration parameter is the configuration object of the
>>> operator.
>>> >> You can set parameters in the operator config as follows:
>>> >>
>>> >> DataSet<String> text = ...
>>> >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>>> >> Tokenizer()).getParameters().setString("myKey", "myVal");
>>> >>
>>> >> Best, Fabian
>>> >>
>>> >>
>>> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mnxfst@gmail.com>:
>>> >>>
>>> >>> Hi
>>> >>>
>>> >>> While working on a RichFilterFunction implementation I was
>>> wondering, if
>>> >>> there is a much better way to access configuration
>>> >>> options read from file during startup. Actually, I am using
>>> >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> >>> to get access to my settings.
>>> >>>
>>> >>> Reason for that is, that the Configuration parameter provided to
the
>>> open
>>> >>> function does not carry my settings. That is probably
>>> >>> the case as I use
>>> >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg)
to
>>> pass my
>>> >>> configuration into the environment
>>> >>> which in turn is not passed on as part of the open call - I found
no
>>> >>> other way to handle configuration ;-)
>>> >>>
>>> >>> My question is: who is responsible for calling the open function,
>>> where
>>> >>> does the configuration parameter has its origins aka where
>>> >>> is its content taken from and is it possible to define somewhere
in
>>> the
>>> >>> main program which configuration to pass into a specific operator?
>>> >>>
>>> >>> Best
>>> >>>   Christian
>>> >>
>>> >>
>>> >
>>>
>>
>>
>

Mime
View raw message