flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: DataStream, Sink and JDBC
Date Mon, 07 Mar 2016 13:22:22 GMT
you'll have to change your sinkfunction to extend RichSinkFunction, and 
then create your JDBC connection within the open method.

On 07.03.2016 14:08, toletum@toletum.org wrote:
> Hi!
> I'm doing a process which reads from kafka, makes some things... and 
> after writes on Database (NEO4J). I can read from kafka, and make some 
> things.... But... I have problems with write on Database (JDBC).
> I tried use a SinkFunction.... It works, but it create a connection 
> each invoke method is called.
>
>
> --------
> DataStream<String> messageStream = this.env.addSource(new 
> FlinkKafkaConsumer082<>(properties.getProperty("topic"), new 
> SimpleStringSchema(), properties));
>
>
> messageStream.map(new StreamingCrimeSplitter
> .filter(new filterFunction())
> .keyBy(1);
> .addSink(new sinkFunction());
>
>
> --------
> --------
> public class sinkFunction
> implements SinkFunction<Tuple7<String, String, String, String, String, 
> String,String>> {
>         private static final long serialVersionUID = 2859601213304525959L;
>         @Override
>         public void invoke(Tuple7<String, String, String, String, 
> String, String, String> crime) throws Exception {
>                 System.out.println(crime.f0);
> //JDBC connection
>         }
> }
> --------
>
>
> Somebody knows how I could do just one connection? I tried to do in 
> the Constructor but the JDBC is not serializable.
>
>
> Thanks
> Toletum
>
>


Mime
View raw message