flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Accessing configuration in RichFunction
Date Mon, 18 Jan 2016 12:38:15 GMT
Hi Christian,

I think the DataStream API does not allow you to pass any parameter to the
open(Configuration) method.
That method is only used in the DataSet (Batch) API, and its use is
discouraged.

A much better option to pass a Configuration into your function is as
follows:


Configuration mapConf = new Configuration();
mapConf.setDouble("somthing", 1.2);

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)

text.flatMap(new Tokenizer(mapConf))
// group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0).sum(1);


And in the Tokenizer:

public static final class Tokenizer implements FlatMapFunction<String,
Tuple2<String, Integer>> {
   private final Configuration mapConf;

   public Tokenizer(Configuration mapConf) {
      this.mapConf = mapConf;
   }


This works as long as the type you're passing is serializable.



On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <mnxfst@gmail.com>
wrote:

> Hi Max,
>
> maybe I explained it a bit mistakable ;-)
>
> I have a stream-based application which contains a RichFilterFunction
> implementation. The parent provides a lifecycle method open
> (open(Configuration)) which receives a Configuration object as input. I
> would like to use this call to pass options into the operator instance.
>
> Unfortunately, I found no hint where and how to provide the information
> such that I receive them at the described method. Actually, I am accessing
> the surrounding runtime context to retrieve the global job parameters where
> I extract the desired information from. But for some reasons I do not want
> the operator to receive its setup information from the provided
> Configuration instance ;-)
>
> That's why I am looking for the place where the configuration object is
> created and passed into the rich filter function. I would like to insert
> dedicated information for a dedicated filter instance.
>
> Best
>   Christian
>
>
> 2016-01-18 12:30 GMT+01:00 Maximilian Michels <mxm@apache.org>:
>
>> Hi Christian,
>>
>> For your implementation, would it suffice to pass a Configuration with
>> your RichFilterFunction? You said the global job parameters are not
>> passed on to your user function? Can you confirm this is a bug?
>>
>> Cheers,
>> Max
>>
>> On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
>> <mnxfst@gmail.com> wrote:
>> > Hi Fabian,
>> >
>> > thanks for your quick response. I just figured out that I forgot to
>> mention
>> > a small but probably relevant detail: I am working with the streaming
>> api.
>> >
>> > Although there is a way to access the overall job settings, I need a
>> > solution to "reduce" the view on configuration options available on
>> operator
>> > level.
>> > For example, I would like to pass instance specific settings like an
>> > operator identifier but there might be different operators in the
>> overall
>> > program.
>> >
>> > Best
>> >   Christian
>> >
>> > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
>> >>
>> >> Hi Christian,
>> >>
>> >> the open method is called by the Flink workers when the parallel tasks
>> are
>> >> initialized.
>> >> The configuration parameter is the configuration object of the
>> operator.
>> >> You can set parameters in the operator config as follows:
>> >>
>> >> DataSet<String> text = ...
>> >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> >> Tokenizer()).getParameters().setString("myKey", "myVal");
>> >>
>> >> Best, Fabian
>> >>
>> >>
>> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mnxfst@gmail.com>:
>> >>>
>> >>> Hi
>> >>>
>> >>> While working on a RichFilterFunction implementation I was wondering,
>> if
>> >>> there is a much better way to access configuration
>> >>> options read from file during startup. Actually, I am using
>> >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>> >>> to get access to my settings.
>> >>>
>> >>> Reason for that is, that the Configuration parameter provided to the
>> open
>> >>> function does not carry my settings. That is probably
>> >>> the case as I use
>> >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
>> pass my
>> >>> configuration into the environment
>> >>> which in turn is not passed on as part of the open call - I found no
>> >>> other way to handle configuration ;-)
>> >>>
>> >>> My question is: who is responsible for calling the open function,
>> where
>> >>> does the configuration parameter has its origins aka where
>> >>> is its content taken from and is it possible to define somewhere in
>> the
>> >>> main program which configuration to pass into a specific operator?
>> >>>
>> >>> Best
>> >>>   Christian
>> >>
>> >>
>> >
>>
>
>

Mime
View raw message