flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timur Fayruzov <timur.fairu...@gmail.com>
Subject Re: Access to a shared resource within a mapper
Date Fri, 22 Apr 2016 15:46:54 GMT
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> you could use a RichMapFunction that has an open method:
>
> data.map(new RichMapFunction[...]() {
>   def open(): () = {
>     // initialize client
>   }
>
>   def map(input: INT): OUT = {
>     // use client
>   }
> }
>
> the open() method is called before any elements are passed to the
> function. The counterpart of open() is close(), which is called after all
> elements are through or if the job cancels.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairuzov@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm writing a Scala Flink application. I have a standalone process that
>> exists on every Flink node that I need to call to transform my data. To
>> access this process I need to initialize non thread-safe client first. I
>> would like to avoid initializing a client for each element being
>> transformed. A straightforward implementation would be something like this:
>> ```
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>> val pool  = new ArrayBlockingQueue[Client](5)
>> // pool is filled here
>> data.map(e => {
>>   val client = pool.take()
>>   val res = client.transform(e)
>>   pool.put(client)
>>   res
>> })
>>
>> ```
>> However, this causes a runtime exception with message "Task not
>> serializable", which makes sense.
>>
>> Function parameters and broadcast variables won't work either as far as I
>> understand. Is there a way to make this happen?
>>
>> Thanks,
>> Timur
>>
>

Mime
View raw message