Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 62A0D200BD1 for ; Mon, 28 Nov 2016 10:03:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 616F4160B0D; Mon, 28 Nov 2016 09:03:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 84813160B06 for ; Mon, 28 Nov 2016 10:03:09 +0100 (CET) Received: (qmail 16803 invoked by uid 500); 28 Nov 2016 09:03:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 16783 invoked by uid 99); 28 Nov 2016 09:03:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Nov 2016 09:03:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E0EE6C2362 for ; Mon, 28 Nov 2016 09:03:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id gpZwUtD_vNby for ; Mon, 28 Nov 2016 09:03:06 +0000 (UTC) Received: from mail-lf0-f43.google.com (mail-lf0-f43.google.com [209.85.215.43]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 54E075F570 for ; Mon, 28 Nov 2016 09:03:06 +0000 (UTC) Received: by mail-lf0-f43.google.com with SMTP id t196so91056287lff.3 for ; Mon, 28 Nov 2016 01:03:06 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=225B/l2Q3bM2JwKswZP9igMB22G3ziRFwTuqtpMEQzM=; b=EwfB97ZNbeU+a9mVhesoOAt5zYC4ZsPrNtC2//8XUQtQD9lL7DORW2HZ4M8pEuAKz0 CnwNxRvcUXlIwGpBY30N25c/Rt+HzItuzJlF/YAauwcIcR8KLicXZ4G61ftnl+p3Ey7/ aiT2CmaQJmjODL865kkeLBRu+67TDwJter7X7cV+UXSbagy2ueSTiwHmxVzJu9pztXn5 x3udpIh8iBIggarodcN4oMWM4E44jHRngVqXpEbZEuRdo45qoig93qUwNz63K5GjJuwF EIk8s6W04KJttGIqFbyDppdaN0Z1TjJ1HK34uA6VRGxkj+OZA2AsiNcLhxhyXqZbBNro K63g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=225B/l2Q3bM2JwKswZP9igMB22G3ziRFwTuqtpMEQzM=; b=IGsn5e+yo3Q2qFhuzsbk1TzyBR3U9tmiVbgmDejZKR+msh8uRUTRaq88JHuyu0QPLm LeStfXUvrPyrL2TUjgihp7TIVDv/oS99yAAJUzWSEYOJJmJA2bxePreKKKHmorVuHX/j TqKcTPktbpE5e0JUV630A2FdCthCFPpC7SxWNb7WrCW8Zz4YCgtvyaX9akTvZkfzkB2i DPpnRvHPuLE9dcjm6Nrt6UGHQWXIoMaZxivf5CJGk0RstcRgfQfzU/zFK0dKmCXVdyAM 9X8/F/GQUvIBIe1dw30vxdcsJOwKUdD9I/aKH+vbX1rIN7tbDX8+yNJjqyD7pFtyxYV9 UEqg== X-Gm-Message-State: AKaTC03wztklVv0Zbkvzq4OgABkxiniL/bPh+5rsMfIONDn6RvOI5bTXmwycoZBVDfaUdiT06wH+v9LJr4ybCw== X-Received: by 10.25.21.205 with SMTP id 74mr7869623lfv.138.1480323778794; Mon, 28 Nov 2016 01:02:58 -0800 (PST) MIME-Version: 1.0 Received: by 10.25.137.4 with HTTP; Mon, 28 Nov 2016 01:02:28 -0800 (PST) In-Reply-To: References: From: Fabian Hueske Date: Mon, 28 Nov 2016 10:02:28 +0100 Message-ID: Subject: Re: DB connection and query inside map function To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11408008fce6fa054258bc5a archived-at: Mon, 28 Nov 2016 09:03:10 -0000 --001a11408008fce6fa054258bc5a Content-Type: text/plain; charset=UTF-8 Hi Anastasios, that's certainly possible. The most straight-forward approach would be a synchronous call to the database. Because only one request is active at the same time, you do not need a thread pool. You can establish the connection in the open() method of a RichMapFunction. The problem with this approach is that the synchronous requests can significantly increase the latency. Doing the calls asynchronously and using a thread pool is not very easy because this would need to be integrated with Flink's checkpointing mechanism. In fact, there is an effort to add a special Map operator that supports asynchronous calls (see FLIP-12 [1]). We expect this to be included in the next minor release, Flink 1.2. Hope this helps, Fabian [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673 2016-11-27 22:11 GMT+01:00 Anastasios Skarlatidis : > Hi! > > I am new to Apache Flink and I would like to ask what is the best way to > query a relational DB inside a map function, in order to enrich the > streaming data. Consider, for example, that I have a KeyedStream[Int, > String] and I would like to query the database based on the Int value > inside a map function `stream.map(v: Int => <> )`. > > Is it possible to have a connection pooler per worker nod,e in order to be > used inside each map function call? > > Best, > > Anastasios > --001a11408008fce6fa054258bc5a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Anastasios,

that's= certainly possible. The most straight-forward approach would be a synchron= ous call to the database.
Because only one request is active = at the same time, you do not need a thread pool.
You can establish= the connection in the open() method of a RichMapFunction. The problem with= this approach is that the synchronous requests can significantly increase = the latency.

Doing the calls asynchronously and using a t= hread pool is not very easy because this would need to be integrated with F= link's checkpointing mechanism.
In fact, there is an effo= rt to add a special Map operator that supports asynchronous calls (see FLIP= -12 [1]).
We expect this to be included in the next minor rel= ease, Flink 1.2.

Hope this helps,
Fabian
=

2016-11-27 = 22:11 GMT+01:00 Anastasios Skarlatidis <a.skarlatidis@gmail.com&= gt;:
Hi!=C2=A0

I am new to Apache Fli= nk and I would like to ask what is the best way to query a relational DB in= side a map function, in order to enrich the streaming data. Consider, for e= xample, that I have a KeyedStream[Int, String] and I would like to query th= e database based on the Int value inside a map function `stream.map(v: Int = =3D> <<some SQL query>> )`.=C2=A0

Is it possible to have a= connection pooler per worker nod,e in order to be used inside each map fun= ction call?

Best,

Anastasios

--001a11408008fce6fa054258bc5a--