flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Access to a shared resource within a mapper
Date Fri, 22 Apr 2016 09:06:36 GMT
Hi,
you could use a RichMapFunction that has an open method:

data.map(new RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  }

  def map(input: INT): OUT = {
    // use client
  }
}

the open() method is called before any elements are passed to the function.
The counterpart of open() is close(), which is called after all elements
are through or if the job cancels.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairuzov@gmail.com>
wrote:

> Hello,
>
> I'm writing a Scala Flink application. I have a standalone process that
> exists on every Flink node that I need to call to transform my data. To
> access this process I need to initialize non thread-safe client first. I
> would like to avoid initializing a client for each element being
> transformed. A straightforward implementation would be something like this:
> ```
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
> val pool  = new ArrayBlockingQueue[Client](5)
> // pool is filled here
> data.map(e => {
>   val client = pool.take()
>   val res = client.transform(e)
>   pool.put(client)
>   res
> })
>
> ```
> However, this causes a runtime exception with message "Task not
> serializable", which makes sense.
>
> Function parameters and broadcast variables won't work either as far as I
> understand. Is there a way to make this happen?
>
> Thanks,
> Timur
>

Mime
View raw message