You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <> wrote:
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <> wrote:
you could use a RichMapFunction that has an open method: RichMapFunction[...]() {
  def open(): () = {
    // initialize client
  def map(input: INT): OUT = {
    // use client

the open() method is called before any elements are passed to the function. The counterpart of open() is close(), which is called after all elements are through or if the job cancels.


On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <> wrote:

I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transformed. A straightforward implementation would be something like this:
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
val pool = new ArrayBlockingQueue[Client](5)
// pool is filled here => {
val client = pool.take()
val res = client.transform(e)
However, this causes a runtime exception with message "Task not serializable", which makes sense.

Function parameters and broadcast variables won't work either as far as I understand. Is there a way to make this happen?