Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 23D7019533 for ; Fri, 22 Apr 2016 18:51:05 +0000 (UTC) Received: (qmail 92215 invoked by uid 500); 22 Apr 2016 18:51:04 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 92120 invoked by uid 500); 22 Apr 2016 18:51:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 92110 invoked by uid 99); 22 Apr 2016 18:51:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Apr 2016 18:51:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 88324180457 for ; Fri, 22 Apr 2016 18:51:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.299 X-Spam-Level: * X-Spam-Status: No, score=1.299 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 4upRD3TXUCn1 for ; Fri, 22 Apr 2016 18:51:03 +0000 (UTC) Received: from mail-io0-f169.google.com (mail-io0-f169.google.com [209.85.223.169]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 050CC5F248 for ; Fri, 22 Apr 2016 18:51:02 +0000 (UTC) Received: by mail-io0-f169.google.com with SMTP id 2so128132888ioy.1 for ; Fri, 22 Apr 2016 11:51:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to; bh=vdDkNPi5SJ4ckxNJ4bw592XLw/sHwJ8QiWA/4G3s90g=; b=Ib2wpjeJsiBGuP4vC1AJBl9jraCSlfZbjdD96aUHlzsFYijGSZQd4wgWR7hFAYQJYE Xh+KIoSZ84kd9IpoQj/FTJUUWwa1sEpQI5SmTt1GYhOii0lUZzayWkDMf9tIq7KsxQRV D4su2U9GhxyomeWoUohRIKvZcZRi1rHBslrDL9MwjxswljbQwSu5d7z2W8QRo303w4Or LeYz0J0RBOiGhPhsstFxDzPVOGKLNAnJtG3wmdQmkTpAbkdlRZPD03GP3vup5ARRCGEV PqEa1pRL3uTIPrPIxUuLgUJvGeZWwYzALvdWzKjqdofXv2OzfB6BYQ5m3MdKiul96zxM le8g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:sender:in-reply-to:references:date :message-id:subject:from:to; bh=vdDkNPi5SJ4ckxNJ4bw592XLw/sHwJ8QiWA/4G3s90g=; b=BAqHM1Wr8vhbPXyfKRonfCIk6bEt1pYCmr2Pi2UpUGbfOC00I8E0YBwnwDQG2JwkTS SN5mBaO46VuWCuTjQ0+2XaE1H59XXBkjco5wuGE10dzvJnK17IkqXmE/cO4Opvly7RUj J5gHpzPltpE62oCWM/pPZn4GbRA2Mxk8ZV6Uqh5+dIKkeeqKwFPzV/qdVPs7L7/2r+3w 76betQxiRBuimFx4BG/NxyYpayon27fNgWNANCOEe8i21J+0skzgn5w9QO3jUu3yPr+1 fDicGpOFe/D+JQcU9aOY59zJJA9gQbpxSGOnUxuigFKqQADFlv4qQOdwxLOw5jZueOsl J4aQ== X-Gm-Message-State: AOPr4FUNfsi9gGmMuGplZ9iOssD4W00drZ4TC/0Bd5v6BkW5f3XCrT7ug/0qTXYrq+FWsv492nD2iKEy7pTbHA== MIME-Version: 1.0 X-Received: by 10.107.18.74 with SMTP id a71mr16763139ioj.116.1461351056752; Fri, 22 Apr 2016 11:50:56 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.107.51.10 with HTTP; Fri, 22 Apr 2016 11:50:56 -0700 (PDT) In-Reply-To: References: Date: Fri, 22 Apr 2016 14:50:56 -0400 X-Google-Sender-Auth: 9dzq-TdyRX9jGKepHLD5QH3lKvc Message-ID: Subject: Re: Access to a shared resource within a mapper From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113ecdaaa1482d0531174e55 --001a113ecdaaa1482d0531174e55 Content-Type: text/plain; charset=UTF-8 You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala. On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov wrote: > Outstanding! Thanks, Aljoscha. > > On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek > wrote: > >> Hi, >> you could use a RichMapFunction that has an open method: >> >> data.map(new RichMapFunction[...]() { >> def open(): () = { >> // initialize client >> } >> >> def map(input: INT): OUT = { >> // use client >> } >> } >> >> the open() method is called before any elements are passed to the >> function. The counterpart of open() is close(), which is called after all >> elements are through or if the job cancels. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov >> wrote: >> >>> Hello, >>> >>> I'm writing a Scala Flink application. I have a standalone process that >>> exists on every Flink node that I need to call to transform my data. To >>> access this process I need to initialize non thread-safe client first. I >>> would like to avoid initializing a client for each element being >>> transformed. A straightforward implementation would be something like this: >>> ``` >>> >>> val env = ExecutionEnvironment.getExecutionEnvironment >>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c")))) >>> val pool = new ArrayBlockingQueue[Client](5) >>> // pool is filled here >>> data.map(e => { >>> val client = pool.take() >>> val res = client.transform(e) >>> pool.put(client) >>> res >>> }) >>> >>> ``` >>> However, this causes a runtime exception with message "Task not >>> serializable", which makes sense. >>> >>> Function parameters and broadcast variables won't work either as far as >>> I understand. Is there a way to make this happen? >>> >>> Thanks, >>> Timur >>> >> > --001a113ecdaaa1482d0531174e55 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
You may also be able to initialize the client only in the = parallel execution by making it a "lazy" variable in Scala.
=

On Fri, Apr 22, 2= 016 at 11:46 AM, Timur Fayruzov <timur.fairuzov@gmail.com> wrote:
Outstanding= ! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:0= 6 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi,
you could use a RichMa= pFunction that has an open method:

data.map(new Ri= chMapFunction[...]() {
=C2=A0 def open(): () =3D {
=C2= =A0 =C2=A0 // initialize client
=C2=A0 }
=C2=A0
=C2=A0 def map(input: INT): OUT =3D {
=C2=A0 =C2=A0 // use cli= ent
=C2=A0 }
}

the open() meth= od is called before any elements are passed to the function. The counterpar= t of open() is close(), which is called after all elements are through or i= f the job cancels.

Cheers,
Aljoscha

On Thu, 21= Apr 2016 at 22:21 Timur Fayruzov <timur.fairuzov@gmail.com> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
Hello,

I&= #39;m writing a Scala Flink application. I have a standalone process that e= xists on every Flink node that I need to call to transform my data. To acce= ss this process I need to initialize non thread-safe client first. I would = like to avoid initializing a client for each element being transformed. A s= traightforward implementation would be something like this:
```
val en=
v =3D ExecutionEnvironment.getExecutionEn=
vironment
= val data =3D env.fromCollection(Seq(MyKey(Some("a")), = MyKey(Some("c"= ;))))
val = pool =3D new= ArrayBlockingQueue[Client](5= )
// pool is filled heredata.map(e =3D> {
val client =3D pool.take()
val res =3D client.transform(e) pool.put(client)
res
})
```
Howe= ver, this causes a runtime exception with message "Task not serializab= le", which makes sense.

Function parameters a= nd broadcast variables won't work either as far as I understand. Is the= re a way to make this happen?

Thanks,
Ti= mur


--001a113ecdaaa1482d0531174e55--