Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B94DF19B83 for ; Mon, 18 Apr 2016 07:36:30 +0000 (UTC) Received: (qmail 73558 invoked by uid 500); 18 Apr 2016 07:36:25 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 73494 invoked by uid 500); 18 Apr 2016 07:36:25 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 73480 invoked by uid 99); 18 Apr 2016 07:36:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Apr 2016 07:36:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9947DC0BC3 for ; Mon, 18 Apr 2016 07:36:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id t096mPTnuTkn for ; Mon, 18 Apr 2016 07:36:20 +0000 (UTC) Received: from mail-wm0-f46.google.com (mail-wm0-f46.google.com [74.125.82.46]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 779185FAC9 for ; Mon, 18 Apr 2016 07:36:19 +0000 (UTC) Received: by mail-wm0-f46.google.com with SMTP id u206so110116873wme.1 for ; Mon, 18 Apr 2016 00:36:19 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=SsTHQ31j4iykFAJ5iZ6YJhTqkWUzOueAMnKUbhlKtgE=; b=SHdIOtJhrovFd81zG1DDfD/8IkU/2s2M8KPQMO+kGa/hihqoabhxl0X01ODHhGEi4Y TmUdqsIAaN9Kj4oK7SKavOJMdLBhQ2A7ht8aqKX24RzqnRk5C8qfcpRek79S4mkw4P21 Zt7r7oL3qOJVwxtBB/t3iz67XlZyfreq3QICxVlutX4HxsqKYvAhAQfYG3jDP1Nu2/3x 9diwrMamQdgNG3q9KqJzkvG6zKX11ftGDo7fgSSwGct05OI0yXeZSIWvGyrM6ey3oogH sM/9dAqD4I/8HJWOrRSpfcdZAE6A3HAx9NdYTfX0eb0A5IMwPAw9P9aPjOXTbTxT8Y2N n+Iw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=SsTHQ31j4iykFAJ5iZ6YJhTqkWUzOueAMnKUbhlKtgE=; b=aw4K6+SzylcKatmQuOJf11XNGSRcGKxJpg8M3GPPwvFepnm4VPB+4c3M3XuQKn7jFP 5AKnexOkc/e9o6BQ2PYdCC+ketQNvgCFIi3i1p+tlbD5FkXfV8mfXrtjiC1HHSS8ksVE 2MjUVvXVa8IKvCTHKpaONSmGgz4zaVNTTZP035BILtRRg4gRr9sdMTl6ULYgVxgixUUs Nm/HJurfBrv+IopZ3JcZM8DI/dk3mtvP6RbvuP672TcybJwqFAf/Dsrfkqk4bF8TaUex 5rYrJ3lpBwd/UbHQYAjSyMb12dbGpAg2LreSpeXz2qJF5NNdZs/fOVVLZH8k+Pe6MnE9 apJg== X-Gm-Message-State: AOPr4FXydhW+B9GpmEfdTV1PZS48Prnjn75fw6TYI0dGjWHyFj69o1GDunl/tESbouVmOfCf+A6gwsLinIrUKQ== X-Received: by 10.28.236.157 with SMTP id h29mr17611964wmi.88.1460964972956; Mon, 18 Apr 2016 00:36:12 -0700 (PDT) MIME-Version: 1.0 Received: by 10.28.93.82 with HTTP; Mon, 18 Apr 2016 00:35:53 -0700 (PDT) X-Originating-IP: [213.203.177.29] In-Reply-To: References: <570FB704.5090506@apache.org> <570FC05D.1060909@apache.org> From: Flavio Pompermaier Date: Mon, 18 Apr 2016 09:35:53 +0200 Message-ID: Subject: Re: FLINK-3750 (JDBCInputFormat) To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a1146acde3e2a180530bd6a10 --001a1146acde3e2a180530bd6a10 Content-Type: text/plain; charset=UTF-8 Talking with Stefano this morning and looking at the DataSourceTask code we discovered that the open() and close() methods are both called for every split and not once per inputFormat instance (maybe open and close should be renamed as openSplit and closeSplit to avoid confusion...). I think that it could worth to add 2 methods to the InputFormat (e.g. openInputFormat() and closeInputFormat() ) to allow for the managment of the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and thus adding a dependency) to avoid the creation of a new connection (expensive operation) for every split (that in our use case happens millions of times). What about the output of the inputFormat? how do you want me to proceed? With POJO or Row? If POJO, which strategy do you suggest? Best, Flavio On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli wrote: > If we share the connection, then we should also be careful with the close() > implementation. I did not see changes for this method in the PR. > > saluti, > Stefano > > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier : > > > Following your suggestions I've fixed the connection reuse in my PR at > > https://github.com/apache/flink/pull/1885. > > I simply check in the establishConnection() if dbConn!=null and, in that > > case, I simply return immediately. > > > > Thus, the only remaining thin to fix is the null handling. Do you have > any > > suggestion about how to transform the results in a POJO? > > Maybe returning a Row and then let the user manage the conversion to the > > target POJO in a successive map could be a more general soloution? > > > > Best, > > Flavio > > > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske > wrote: > > > > > There is an InputFormat object for each parallel task of a DataSource. > > > So for a source with parallelism 8 you will have 8 instances of the > > > InputFormat running, regardless whether this is on one box with 8 slots > > or > > > 8 machines with 1 slots each. > > > The same is true for all other operators (Map, Reduce, Join, etc.) and > > > DataSinks. > > > > > > Note, a single task does not fill a slot, but a "slice" of the program > > (one > > > parallel task of each operator) fills a slot. > > > > > > Cheers, Fabian > > > > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier : > > > > > > > ok thanks!just one last question: an inputformat is instantiated for > > each > > > > task slot or once for task manger? > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" wrote: > > > > > > > > > no. > > > > > > > > > > if (connection==null) { > > > > > establishCOnnection(); > > > > > } > > > > > > > > > > done. same connection for all splits. > > > > > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > > > > > >> I didn't understand what you mean for "it should also be possible > to > > > > reuse > > > > >> the same connection of an InputFormat across InputSplits, i.e., > > calls > > > of > > > > >> the open() method". > > > > >> At the moment in the open method there's a call to > > > establishConnection, > > > > >> thus, a new connection is created for each split. > > > > >> If I understood correctly, you're suggesting to create a pool in > the > > > > >> inputFormat and simply call poo.borrow() in the open() rather than > > > > >> establishConnection? > > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" > > wrote: > > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > > >>> > > > > >>> Hi Flavio, > > > > >>>> > > > > >>>> that are good questions. > > > > >>>> > > > > >>>> 1) Replacing null values by default values and simply forwarding > > > > records > > > > >>>> is > > > > >>>> very dangerous, in my opinion. > > > > >>>> I see two alternatives: A) we use a data type that tolerates > null > > > > >>>> values. > > > > >>>> This could be a POJO that the user has to provide or Row. The > > > drawback > > > > >>>> of > > > > >>>> Row is that it is untyped and not easy to handle. B) We use > Tuple > > > and > > > > >>>> add > > > > >>>> an additional field that holds an Integer which serves as a > bitset > > > to > > > > >>>> mark > > > > >>>> null fields. This would be a pretty low level API though. I am > > > leaning > > > > >>>> towards the user-provided POJO option. > > > > >>>> > > > > >>>> i would also lean towards the POJO option. > > > > >>> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I > > > think > > > > we > > > > >>>> can add a dependency to that module. However, it should also be > > > > possible > > > > >>>> to > > > > >>>> reuse the same connection of an InputFormat across InputSplits, > > > i.e., > > > > >>>> calls > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > > >>>> > > > > >>>> this is the right approach imo. > > > > >>> > > > > >>> Best, Fabian > > > > >>>> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > pompermaier@okkam.it > > > >: > > > > >>>> > > > > >>>> Hi guys, > > > > >>>> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a > > > couple > > > > >>>>> of > > > > >>>>> thing that I'd like to discuss with the core developers. > > > > >>>>> > > > > >>>>> > > > > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: > At > > > the > > > > >>>>> moment > > > > >>>>> if I find a null value for a Double, the getDouble of > jdbc > > > > return > > > > >>>>> 0.0. > > > > >>>>> Is > > > > >>>>> it really the correct behaviour? Wouldn't be better to > use a > > > > POJO > > > > >>>>> or > > > > >>>>> the > > > > >>>>> Row of datatable that can handle void? Moreover, the > mapping > > > > >>>>> between > > > > >>>>> SQL > > > > >>>>> type and Java types varies much from the single JDBC > > > > >>>>> implementation. > > > > >>>>> Wouldn't be better to rely on the Java type coming from > > using > > > > >>>>> resultSet.getObject() to get such a mapping rather than > > using > > > > the > > > > >>>>> ResultSetMetadata types? > > > > >>>>> 2. I'd like to handle connections very efficiently because > > we > > > > >>>>> have a > > > > >>>>> use > > > > >>>>> case with billions of records and thus millions of splits > > and > > > > >>>>> establish > > > > >>>>> a > > > > >>>>> new connection each time could be expensive. Would it be a > > > > >>>>> problem to > > > > >>>>> add > > > > >>>>> apache pool dependency to the jdbc batch connector in > order > > to > > > > >>>>> reuase > > > > >>>>> the > > > > >>>>> created connections? > > > > >>>>> > > > > >>>>> > > > > >>>>> [1] > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > > > > > > > > > > > > --001a1146acde3e2a180530bd6a10--