Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 15550200B9F for ; Tue, 11 Oct 2016 09:35:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1264A160AE6; Tue, 11 Oct 2016 07:35:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08CD9160AD2 for ; Tue, 11 Oct 2016 09:35:44 +0200 (CEST) Received: (qmail 93805 invoked by uid 500); 11 Oct 2016 07:35:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 93795 invoked by uid 99); 11 Oct 2016 07:35:43 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Oct 2016 07:35:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 50A48C1027 for ; Tue, 11 Oct 2016 07:35:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.679 X-Spam-Level: * X-Spam-Status: No, score=1.679 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id AoRfqCB6aqaX for ; Tue, 11 Oct 2016 07:35:39 +0000 (UTC) Received: from mail-lf0-f50.google.com (mail-lf0-f50.google.com [209.85.215.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id CDE8E5FB32 for ; Tue, 11 Oct 2016 07:35:38 +0000 (UTC) Received: by mail-lf0-f50.google.com with SMTP id b75so27139634lfg.3 for ; Tue, 11 Oct 2016 00:35:38 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=xKO1IFc5NDvOpWzQVX6kxibGTbXBcuxa0bzK9pH3eAM=; b=DznMRMFy+q4rI5XXYCdsAQ1bY5ynLaO0lktEfP8bqe558z8wtrvTjjcqLK/qH0DlBB f+dm81aZiWTSDPW6lXvIN0D/JlZ49rkhf8E2VEPwpjrd40TrmtspMxf9sfA60SW21G+D WBMHEPED4J01NU13fXmzIS7dMRretPLKQcx4EaYbu0nmPvUvxKyNtCE+aV6bnZs1g0bn un6Y0BcO5JEzBSPQwAPxFe0b0tIp1RWy2jxiYxvpAd3BExw1QEERlVR9QmpZhCM7RxF9 mA0JnODGP37NXDgwTXcTL6c1JUjHbM0p8hAcn4/PimR3ve2N634c7GjvIklrS9JxD4Y1 vX2Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=xKO1IFc5NDvOpWzQVX6kxibGTbXBcuxa0bzK9pH3eAM=; b=FOKKHrpuATX4Q5pFKoCtVUV/iTNRwv2ZAEPRz6pJIk+VINuYv1GrFnaMBAd4USmApv ogPOQepzXxcowh4jD/G94INTpThZRadreC0eH/3/XBxvhlfLSH9gDx5PxNVKXvNywODS 7ceDMN4W4zQG3XJs8ALbFZ7SzbapK/gLXRE9yu2EHuvR7d7tJidGza4PP23lQUVI3n2b d+tlIdLJaCMNEhj76OhWzH+dYycJksRAPOR0YwOvhsr3qc26ns23akmCrq4KPk35hUiL mCYBy3+QtfZHQZp8G+NNLbDYsA4XXen8XQ4oe/QrBVGM/PLtIyp3ABQHKKfjL4sOLpHF FSBQ== X-Gm-Message-State: AA6/9Rl9GNUifrj3vCT9Cxp4zjRHMt8nJ03rWBOZITdEyDRn1wP8GPB5T+YOsVR71rn5nwgtyf2MO9Hm6c0miw== X-Received: by 10.25.18.39 with SMTP id h39mr1603471lfi.138.1476171336982; Tue, 11 Oct 2016 00:35:36 -0700 (PDT) MIME-Version: 1.0 Received: by 10.25.133.7 with HTTP; Tue, 11 Oct 2016 00:35:06 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Tue, 11 Oct 2016 09:35:06 +0200 Message-ID: Subject: Re: Current alternatives for async I/O To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113fc3222b24f6053e91ec54 archived-at: Tue, 11 Oct 2016 07:35:46 -0000 --001a113fc3222b24f6053e91ec54 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Ken, I think your solution should work. You need to make sure though, that you properly manage the state of your function, i.e., memorize all records which have been received but haven't be emitted yet. Otherwise records might get lost in case of a failure. Alternatively, you can implement this as a custom operators. This would give you full access but you would need to take care of organizing checkpoints and other low-level issues yourself. This would also be basically the same as implementing FLIP-12 (or a subset of it). Best, Fabian 2016-10-09 3:31 GMT+02:00 Ken Krugler : > Hi all, > > I=E2=80=99ve been watching the FLIP-12 > design > discussion, and it looks like a promising solution for the issues we=E2= =80=99ve got > with needing to make asynchronous multi-threaded requests in a Flink > operator. > > What=E2=80=99s the best workaround with current releases of Flink? > > One option is to have a special tickler source that broadcasts a Tuple0 > every X milliseconds, which gets connected to the real stream that feeds = a > CoFlatMap. Inside of this I=E2=80=99ve got queues for incoming and genera= ted > tuples, with a thread pool to pull from the incoming and write to the > generated queues. When I get one of the =E2=80=9Ctickle=E2=80=9D Tuple0s,= I emit all of the > generated tuples. > > There are issues with needing to bound the size of the queues, and all of > the usual fun with thread pools, but it seems to work. > > Is there a better/simpler approach? > > Thanks, > > =E2=80=94 Ken > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Cassandra & Solr > > --001a113fc3222b24f6053e91ec54 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Ken,

I think your solution = should work.
You need to make sure though, that you properly manage the= state of your function, i.e., memorize all records which have been receive= d but haven't be emitted yet.
Otherwise records might get= lost in case of a failure.

Alternatively, you can implement t= his as a custom operators. This would give you full access but you would ne= ed to take care of organizing checkpoints and other low-level issues yourse= lf. This would also be basically the same as implementing FLIP-12 (or a sub= set of it).

Best, Fabian


2016-10-09 3:31 GMT+02:00 Ken = Krugler <kkrugler_lists@transpac.com>:
Hi all,

=
I=E2=80=99ve been watching the=C2=A0FLIP-12=C2=A0design discussion, and it looks like a promising soluti= on for the issues we=E2=80=99ve got with needing to make asynchronous multi= -threaded requests in a Flink operator.

What=E2=80= =99s the best workaround with current releases of Flink?

One option is to have a special tickler source that broadcasts a Tup= le0 every X milliseconds, which gets connected to the real stream that feed= s a CoFlatMap. Inside of this I=E2=80=99ve got queues for incoming and gene= rated tuples, with a thread pool to pull from the incoming and write to the= generated queues. When I get one of the =E2=80=9Ctickle=E2=80=9D Tuple0s, = I emit all of the generated tuples.

There are issu= es with needing to bound the size of the queues, and all of the usual fun w= ith thread pools, but it seems to work.=C2=A0

Is t= here a better/simpler approach?

Thanks,
=
=E2=80=94 Ken

--------------------------
<= div>
Ken Krugler
<= div>+1 530-210-6378
custom big= data solutions & training
Hadoop, Cascading, Cassandra &= Solr
<= /span>


--001a113fc3222b24f6053e91ec54--