From user-return-20054-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu May 17 12:48:26 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 40EDE180634 for ; Thu, 17 May 2018 12:48:26 +0200 (CEST) Received: (qmail 51940 invoked by uid 500); 17 May 2018 10:48:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 51930 invoked by uid 99); 17 May 2018 10:48:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2018 10:48:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 716E01A361C for ; Thu, 17 May 2018 10:48:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id wC8uJvbeG3Ut for ; Thu, 17 May 2018 10:48:18 +0000 (UTC) Received: from mail-lf0-f46.google.com (mail-lf0-f46.google.com [209.85.215.46]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 5CEF95F20B for ; Thu, 17 May 2018 10:48:17 +0000 (UTC) Received: by mail-lf0-f46.google.com with SMTP id z142-v6so8037997lff.5 for ; Thu, 17 May 2018 03:48:17 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=/MqvqvPnI3IsUhYJwMC0fiy7N9gFbZBBJoyzGbYzqJg=; b=VrhDbpLV3OknsjO5vHL2Wgl3YlfGNFqSL0r1WH81uo19+fF4c0l2Bk5LQBdtNL/OCA YDNKhsynG43rL8CIA0pxUhM8W2f9zC3eLAyU/J6GROw+z4CoCw4liliPbKMbCyivSS6Z ZKUWGegob+4BbsYlLkHY4XB9P5DVIzuWzCTSFR8A9LExs5gWqBmBO3kkGeMU25AcdjVo gTG4a9q5HcAiDZrY3I5g8sC3+377wFNuiBb9WOtmxQg0VEcQViPTp6bjTK9cY+pwbW8B I3RhO54dm7K/No0UMqiFdG+DHyIO407r271bQd6AiQ5A0WESWaefwZfemFNB6JcdjCbT xw6g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=/MqvqvPnI3IsUhYJwMC0fiy7N9gFbZBBJoyzGbYzqJg=; b=Z0PBMp4Un8nd2NleJHF/b0HDHKuL9XFRDcGFqfp6h4tK0rL8c/q4KwfvA+ZM5VmipL Og1imaKoJFafttzzXhCFGKONe/Fnvk+vrbFdqXXvTZMCIkcUl5pEEzLzI5ykjgGbHp0x H8KzzY3x93TWmFkGoyzN7qnl88dy5C1NEDiOwTB/yt+t4B9vIykctSsosvHRukWLBIU+ CCB7Xl9ViN/+1+L9o6uN4YkUMN+s85dzCg2LcCXxdYe9l/WY3wQ9R+agKantVRc3AGuD khl71XbYqWlY5zLftAFdMnbEFRnBV7eIfdMKIyTSYpj9Y6P5wyq5v9t5dtjhpXVdGrOu Up1Q== X-Gm-Message-State: ALKqPwe1GoFY73hUhypUB11FIp5tngMVmKV2ObE2wACjSonoXg1a8ePL JZTi3/4H6BZ+70KktwiSp3XmFu5JCLa/CkAXSqA= X-Google-Smtp-Source: AB8JxZpjFcUlBO+36/fje3XfSNEyf3hpq4PzSCBRIk84ZOldyMdU7xvDJJu7uziOydkm+lsALJ+GCowlNG8AXs8e+rk= X-Received: by 2002:a19:8e:: with SMTP id 136-v6mr16638063lfa.94.1526554091437; Thu, 17 May 2018 03:48:11 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: "Federico D'Ambrosio" Date: Thu, 17 May 2018 12:47:59 +0200 Message-ID: Subject: Re: Async Source Function in Flink To: Timo Walther Cc: user Content-Type: multipart/alternative; boundary="00000000000059c50e056c64925d" --00000000000059c50e056c64925d Content-Type: text/plain; charset="UTF-8" I see, thank you very much for your answer! I'll look into pool connection handling. Alternatively, I suppose that since it is a SourceFunction, even synchronous calls may be used without side effects in Flink? Thank you, Federico Il giorno mar 15 mag 2018 alle ore 16:16 Timo Walther ha scritto: > Hi Frederico, > > Flink's AsyncFunction is meant for enriching a record with information > that needs to be queried externally. So I guess you can't use it for your > use case because an async call is initiated by the input. However, your > custom SourceFunction could implement a similar asynchronous logic. By > having a pool of open connections that request asynchronously and emit the > response to the stream, once available, you can improve your throughput > (see [0]). > > Depending on your use case maybe the SourceFunction can only be > responsible for determining e.g. ids and the AsyncFunction is requesting > these ids via REST. This way you could leverage the available async > capabilities. > > I hope this helps. > > Regards, > Timo > > [0] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/asyncio.html#the-need-for-asynchronous-io-operations > > > Am 14.05.18 um 14:51 schrieb Federico D'Ambrosio: > > Hello everyone, > > just wanted to ask a quick question: I have to retrieve data from 2 web > services via REST calls, use them as sources and push these data to Kafka. > So far, I implemented a SourceFunction which deals with making the calls > with the respective clients. > > Now, the function does use, for each REST call, Await.result(....). Do I > need to use Flink's AsyncFunction instead? What are the best practices when > it comes to AsyncSources? > > Thank you, > -- > Federico D'Ambrosio > > > -- Federico D'Ambrosio --00000000000059c50e056c64925d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I see, thank you very much for your answer! I'll = look into pool connection handling.

Alternativ= ely, I suppose that since it is a SourceFunction, even synchronous calls ma= y be used without side effects in Flink?

Thank= you,
Federico

Il giorno mar 15 mag 2018 alle ore 16:16 Timo Walther <twalthr@apache.org&= gt; ha scritto:
=20 =20 =20
Hi Frederico,

Flink's AsyncFunction is meant for enriching a record with information that needs to be queried externally. So I guess you can't use it for your use case because an async call is initiated by the input. However, your custom SourceFunction could implement a similar asynchronous logic. By having a pool of open connections that request asynchronously and emit the response to the stream, once available, you can improve your throughput (see [0]).

Depending on your use case maybe the SourceFunction can only be responsible for determining e.g. ids and the AsyncFunction is requesting these ids via REST. This way you could leverage the available async capabilities.

I hope this helps.

Regards,
Timo

[0] https://ci.apache.org/projects/flink/flink-docs-master/dev/stre= am/operators/asyncio.html#the-need-for-asynchronous-io-operations


Am 14.05.18 um 14:51 schrieb Federico D'Ambrosio:
Hello everyone,

just wanted to ask a quick question: I have to retrieve data from 2 web services via REST calls, use them as sources and push these data to Kafka. So far, I implemented a SourceFunction which deals with making the calls with the respective clients.

Now, the function does use, for each REST call, Await.result(....). Do I need to use Flink's AsyncFunction instead? What are the best practices when it comes to AsyncSources?

Thank you,
--
Federico D'Ambro= sio




--
Fe= derico D'Ambrosio
--00000000000059c50e056c64925d--