flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Prez Cannady <revp...@correlatesystems.com>
Subject Re: JDBCInputFormat preparation with Flink 1.1-SNAPSHOT and Scala 2.11
Date Wed, 09 Mar 2016 11:46:54 GMT
I suspected as much (the tuple size limitation).  Creating my own InputFormat seems to be the
best solution, but before i go down that rabbit hole I wanted to see at least a semi-trivial
working example of JDBCInputFormat with Scala 2.11.

I’d appreciate a look at that prototype if its publicly available (even if it is Java).
I might glean a hint from it.

Prez Cannady  
p: 617 500 3378  
e: revprez@opencorrelate.org <mailto:revprez@opencorrelate.org>  
GH: https://github.com/opencorrelate <https://github.com/opencorrelate>  
LI: https://www.linkedin.com/in/revprez <https://www.linkedin.com/in/revprez>  

> On Mar 9, 2016, at 3:25 AM, Chesnay Schepler <chesnay@apache.org> wrote:
> 
> you can always create your own InputFormat, but there is no AbstractJDBCInputFormat if
that's what you were looking for. 
> 
> When you say arbitrary tuple size, do you mean a) a size greater than 25, or b) tuples
of different sizes?
> If a) unless you are fine with using nested tuples you won't get around the tuple size
limitation. Since the user has to be aware of the nesting (since the fields can be accessed
directly via tuple.f0 etc), this can't really be done in a general-purpose fashion.
> If b) this will straight-up not work with tuples.
> 
> You could use POJO's though. then you could also group by column names.
> 
> I'm not sure about Scala, but in the Java Stream API you can pass the InputFormat and
the TypeInformation into createInput. 
> 
> I've recently did a prototype where the input type is determined automatically by querying
the database. If this is a problem for you feel free to ping me.
> 
> On 09.03.2016 03:17, Prez Cannady wrote:
>> I’m attempting to create a stream using JDBCInputFormat.  Objective is to convert
each record into a tuple and then serialize for input into a Kafka topic.  Here’s what I
have so far.
>> 
>> ```
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> 
>> val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
>>       .setDrivername("org.postgresql.Driver")
>>       .setDBUrl("jdbc:postgresql:test")
>>       .setQuery("select name from persons")
>>       .finish()
>> 
>> val stream : DataStream[Tuple1[String]] = env.createInput(...)
>> ```
>> 
>> I think this is essentially what I want to do.  It would be nice if I could return
tuples of arbitrary length, but reading the code suggests I have to commit to a defined arity.
 So I have some questions.
>> 
>> 1. Is there a better way to read from a database (i.e., defining my own `InputFormat`
using Slick)?
>> 2. To get the above example working, what should I supply to `createInput`?
>> 
>> 
>> Prez Cannady  
>> p: 617 500 3378  
>> e:  <mailto:revprez@opencorrelate.org>revprez@opencorrelate.org <mailto:revprez@opencorrelate.org>
 
>> GH:  <https://github.com/opencorrelate>https://github.com/opencorrelate <https://github.com/opencorrelate>
 
>> LI:  <https://www.linkedin.com/in/revprez>https://www.linkedin.com/in/revprez
<https://www.linkedin.com/in/revprez>  
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 


Mime
View raw message