flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timur Fayruzov <timur.fairu...@gmail.com>
Subject Re: Access to a shared resource within a mapper
Date Fri, 22 Apr 2016 19:21:09 GMT
Actually, a follow-up question: is map function single-threaded (within one
task manager, that is). If it's not then lazy initialization wont' work, is
it right?

On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <sewen@apache.org> wrote:

> You may also be able to initialize the client only in the parallel
> execution by making it a "lazy" variable in Scala.
>
> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairuzov@gmail.com
> > wrote:
>
>> Outstanding! Thanks, Aljoscha.
>>
>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Hi,
>>> you could use a RichMapFunction that has an open method:
>>>
>>> data.map(new RichMapFunction[...]() {
>>>   def open(): () = {
>>>     // initialize client
>>>   }
>>>
>>>   def map(input: INT): OUT = {
>>>     // use client
>>>   }
>>> }
>>>
>>> the open() method is called before any elements are passed to the
>>> function. The counterpart of open() is close(), which is called after all
>>> elements are through or if the job cancels.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairuzov@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm writing a Scala Flink application. I have a standalone process that
>>>> exists on every Flink node that I need to call to transform my data. To
>>>> access this process I need to initialize non thread-safe client first. I
>>>> would like to avoid initializing a client for each element being
>>>> transformed. A straightforward implementation would be something like this:
>>>> ```
>>>>
>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>> // pool is filled here
>>>> data.map(e => {
>>>>   val client = pool.take()
>>>>   val res = client.transform(e)
>>>>   pool.put(client)
>>>>   res
>>>> })
>>>>
>>>> ```
>>>> However, this causes a runtime exception with message "Task not
>>>> serializable", which makes sense.
>>>>
>>>> Function parameters and broadcast variables won't work either as far as
>>>> I understand. Is there a way to make this happen?
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>
>>
>

Mime
View raw message