flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: FLINK-3750 (JDBCInputFormat)
Date Mon, 18 Apr 2016 10:01:03 GMT
I agree, a method to close an input format is missing.
InputFormat is an API stable interface, so it is not possible to extend it
(until Flink 2.0). RichInputFormat is API stable as well, but an abstract
class. So it should be possible to add an empty default implementation of a
closeInputFormat() method there.

Of course it would be good to re-use connections across input splits.
On the other hand, input splits should not be too fine-grained as well,
because input split assignment has some overhead as well.

Best, Fabian

2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> Yes, I forgot to mention that I could instantiate the connection in the
> configure() but then I can't close it (as you confirmed) :(
>
> On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > There is also InputFormat.configure() which is called before any split
> > processing happens. But I see your point about a missing close() method
> > that is called after all input splits have been processed.
> >
> > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <s.bortoli@gmail.com>
> wrote:
> >
> > > Of course there is one already. We'll look into the runtime context.
> > >
> > > saluti,
> > > Stefano
> > >
> > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
> > >
> > > > Being a generic JDBC input format, I would prefer to stay with Row,
> > > > letting the developer manage the cast according to the driver
> > > > functionalities.
> > > >
> > > > As for the open() and close() issue, I agree with Flavio that we'd
> > need a
> > > > better management of the inputformat lifecycle. Perhaps a new
> interface
> > > > extending it: RichInputFormat?
> > > >
> > > > my2c.
> > > >
> > > > Stefano
> > > >
> > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
> > > >
> > > >> Talking with Stefano this morning and looking at the DataSourceTask
> > code
> > > >> we
> > > >> discovered that the open() and close() methods are both called for
> > every
> > > >> split and not once per inputFormat instance (maybe open and close
> > should
> > > >> be
> > > >> renamed as openSplit and closeSplit to avoid confusion...).
> > > >> I think that it could worth to add 2 methods to the InputFormat
> (e.g.
> > > >> openInputFormat() and closeInputFormat() ) to allow for the
> managment
> > of
> > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool
> > > (and
> > > >> thus adding a dependency) to avoid the creation of a new connection
> > > >> (expensive operation) for every split (that in our use case happens
> > > >> millions of times).
> > > >>
> > > >> What about the output of the inputFormat? how do you want me to
> > proceed?
> > > >> With POJO or Row? If POJO, which strategy do you suggest?
> > > >>
> > > >> Best,
> > > >> Flavio
> > > >>
> > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <
> s.bortoli@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >> > If we share the connection, then we should also be careful with
> the
> > > >> close()
> > > >> > implementation. I did not see changes for this method in the
PR.
> > > >> >
> > > >> > saluti,
> > > >> > Stefano
> > > >> >
> > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <
> pompermaier@okkam.it
> > >:
> > > >> >
> > > >> > > Following your suggestions I've fixed the connection reuse
in my
> > PR
> > > at
> > > >> > > https://github.com/apache/flink/pull/1885.
> > > >> > > I simply check in the establishConnection() if dbConn!=null
and,
> > in
> > > >> that
> > > >> > > case, I simply return immediately.
> > > >> > >
> > > >> > > Thus, the only remaining thin to fix is the null handling.
Do
> you
> > > have
> > > >> > any
> > > >> > > suggestion about how to transform the results in a POJO?
> > > >> > > Maybe returning a Row and then let the user manage the
> conversion
> > to
> > > >> the
> > > >> > > target POJO in a successive map could be a more general
> soloution?
> > > >> > >
> > > >> > > Best,
> > > >> > > Flavio
> > > >> > >
> > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <
> fhueske@gmail.com
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > There is an InputFormat object for each parallel task
of a
> > > >> DataSource.
> > > >> > > > So for a source with parallelism 8 you will have 8
instances
> of
> > > the
> > > >> > > > InputFormat running, regardless whether this is on
one box
> with
> > 8
> > > >> slots
> > > >> > > or
> > > >> > > > 8 machines with 1 slots each.
> > > >> > > > The same is true for all other operators (Map, Reduce,
Join,
> > etc.)
> > > >> and
> > > >> > > > DataSinks.
> > > >> > > >
> > > >> > > > Note, a single task does not fill a slot, but a "slice"
of the
> > > >> program
> > > >> > > (one
> > > >> > > > parallel task of each operator) fills a slot.
> > > >> > > >
> > > >> > > > Cheers, Fabian
> > > >> > > >
> > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> > > pompermaier@okkam.it
> > > >> >:
> > > >> > > >
> > > >> > > > > ok thanks!just one last question: an inputformat
is
> > instantiated
> > > >> for
> > > >> > > each
> > > >> > > > > task slot or once for task manger?
> > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <
> chesnay@apache.org>
> > > >> wrote:
> > > >> > > > >
> > > >> > > > > > no.
> > > >> > > > > >
> > > >> > > > > > if (connection==null) {
> > > >> > > > > >  establishCOnnection();
> > > >> > > > > > }
> > > >> > > > > >
> > > >> > > > > > done. same connection for all splits.
> > > >> > > > > >
> > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > >> > > > > >
> > > >> > > > > >> I didn't understand what you mean for
"it should also be
> > > >> possible
> > > >> > to
> > > >> > > > > reuse
> > > >> > > > > >> the same connection of an InputFormat
across InputSplits,
> > > i.e.,
> > > >> > > calls
> > > >> > > > of
> > > >> > > > > >> the open() method".
> > > >> > > > > >> At the moment in the open method there's
a call to
> > > >> > > > establishConnection,
> > > >> > > > > >> thus, a new connection is created for
each split.
> > > >> > > > > >> If I understood correctly, you're suggesting
to create a
> > pool
> > > >> in
> > > >> > the
> > > >> > > > > >> inputFormat and simply call poo.borrow()
in the open()
> > rather
> > > >> than
> > > >> > > > > >> establishConnection?
> > > >> > > > > >>
> > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler"
<
> > chesnay@apache.org
> > > >
> > > >> > > wrote:
> > > >> > > > > >>
> > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > >> > > > > >>>
> > > >> > > > > >>> Hi Flavio,
> > > >> > > > > >>>>
> > > >> > > > > >>>> that are good questions.
> > > >> > > > > >>>>
> > > >> > > > > >>>> 1) Replacing null values by default
values and simply
> > > >> forwarding
> > > >> > > > > records
> > > >> > > > > >>>> is
> > > >> > > > > >>>> very dangerous, in my opinion.
> > > >> > > > > >>>> I see two alternatives: A) we
use a data type that
> > > tolerates
> > > >> > null
> > > >> > > > > >>>> values.
> > > >> > > > > >>>> This could be a POJO that the
user has to provide or
> Row.
> > > The
> > > >> > > > drawback
> > > >> > > > > >>>> of
> > > >> > > > > >>>> Row is that it is untyped and
not easy to handle. B) We
> > use
> > > >> > Tuple
> > > >> > > > and
> > > >> > > > > >>>> add
> > > >> > > > > >>>> an additional field that holds
an Integer which serves
> > as a
> > > >> > bitset
> > > >> > > > to
> > > >> > > > > >>>> mark
> > > >> > > > > >>>> null fields. This would be a
pretty low level API
> > though. I
> > > >> am
> > > >> > > > leaning
> > > >> > > > > >>>> towards the user-provided POJO
option.
> > > >> > > > > >>>>
> > > >> > > > > >>>> i would also lean towards the
POJO option.
> > > >> > > > > >>>
> > > >> > > > > >>> 2) The JDBCInputFormat is located
in a dedicated Maven
> > > >> module. I
> > > >> > > > think
> > > >> > > > > we
> > > >> > > > > >>>> can add a dependency to that
module. However, it should
> > > also
> > > >> be
> > > >> > > > > possible
> > > >> > > > > >>>> to
> > > >> > > > > >>>> reuse the same connection of
an InputFormat across
> > > >> InputSplits,
> > > >> > > > i.e.,
> > > >> > > > > >>>> calls
> > > >> > > > > >>>> of the open() method. Wouldn't
that be sufficient?
> > > >> > > > > >>>>
> > > >> > > > > >>>> this is the right approach imo.
> > > >> > > > > >>>
> > > >> > > > > >>> Best, Fabian
> > > >> > > > > >>>>
> > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio
Pompermaier <
> > > >> > > pompermaier@okkam.it
> > > >> > > > >:
> > > >> > > > > >>>>
> > > >> > > > > >>>> Hi guys,
> > > >> > > > > >>>>
> > > >> > > > > >>>>> I'm integrating the comments
of Chesnay to my PR but
> > > >> there's a
> > > >> > > > couple
> > > >> > > > > >>>>> of
> > > >> > > > > >>>>> thing that I'd like to discuss
with the core
> developers.
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>      1. about the JDBC type
mapping (addValue() method
> > at
> > > >> [1]:
> > > >> > At
> > > >> > > > the
> > > >> > > > > >>>>> moment
> > > >> > > > > >>>>>      if I find a null value
for a  Double, the
> getDouble
> > > of
> > > >> > jdbc
> > > >> > > > > return
> > > >> > > > > >>>>> 0.0.
> > > >> > > > > >>>>> Is
> > > >> > > > > >>>>>      it really the correct
behaviour? Wouldn't be
> better
> > > to
> > > >> > use a
> > > >> > > > > POJO
> > > >> > > > > >>>>> or
> > > >> > > > > >>>>> the
> > > >> > > > > >>>>>      Row of datatable that
can handle void? Moreover,
> > the
> > > >> > mapping
> > > >> > > > > >>>>> between
> > > >> > > > > >>>>> SQL
> > > >> > > > > >>>>>      type and Java types
varies much from the single
> > JDBC
> > > >> > > > > >>>>> implementation.
> > > >> > > > > >>>>>      Wouldn't be better to
rely on the Java type
> coming
> > > from
> > > >> > > using
> > > >> > > > > >>>>>      resultSet.getObject()
to get such a mapping
> rather
> > > than
> > > >> > > using
> > > >> > > > > the
> > > >> > > > > >>>>>      ResultSetMetadata types?
> > > >> > > > > >>>>>      2. I'd like to handle
connections very
> efficiently
> > > >> because
> > > >> > > we
> > > >> > > > > >>>>> have a
> > > >> > > > > >>>>> use
> > > >> > > > > >>>>>      case with billions of
records and thus millions
> of
> > > >> splits
> > > >> > > and
> > > >> > > > > >>>>> establish
> > > >> > > > > >>>>> a
> > > >> > > > > >>>>>      new connection each
time could be expensive.
> Would
> > it
> > > >> be a
> > > >> > > > > >>>>> problem to
> > > >> > > > > >>>>> add
> > > >> > > > > >>>>>      apache pool dependency
to the jdbc batch
> connector
> > in
> > > >> > order
> > > >> > > to
> > > >> > > > > >>>>> reuase
> > > >> > > > > >>>>> the
> > > >> > > > > >>>>>      created connections?
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>> [1]
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message