flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chiwan Park <chiwanp...@apache.org>
Subject Re: DataStream, Sink and JDBC
Date Mon, 07 Mar 2016 13:18:30 GMT
Hi Toletum,

You can initialize a JDBC connection with RichSinkFunction [1]. There are two methods, `open`
and `close`. The `open` method is called once before calling `invoke` method. The `close`
method is called lastly.

Note that you should add `transient` keyword to the JDBC connection object.

Regards,
Chiwan Park

[1]: https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html

> On Mar 7, 2016, at 10:08 PM, toletum@toletum.org wrote:
> 
> Hi!
> I'm doing a process which reads from kafka, makes some things... and after writes on
Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with
write on Database (JDBC).
> I tried use a SinkFunction.... It works, but it create a connection each invoke method
is called.
> 
> 
> --------
> DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"),
new SimpleStringSchema(), properties));
> 
> 
> messageStream.map(new StreamingCrimeSplitter 
> .filter(new filterFunction())
> .keyBy(1);
> .addSink(new sinkFunction());
> 
> 
> --------
> --------
> public class sinkFunction
> implements SinkFunction<Tuple7<String, String, String, String, String, String,String>>
{
>         private static final long serialVersionUID = 2859601213304525959L;
>         @Override
>         public void invoke(Tuple7<String, String, String, String, String, String,
String> crime) throws Exception {
>                 System.out.println(crime.f0);
> //JDBC connection
>         }
> }
> --------
> 
> 
> Somebody knows how I could do just one connection? I tried to do in the Constructor but
the JDBC is not serializable.
> 
> 
> Thanks
> Toletum
> 
> 


Mime
View raw message