flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tole...@toletum.org
Subject DataStream, Sink and JDBC
Date Mon, 07 Mar 2016 13:08:05 GMT
Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database
(NEO4J). I can read from kafka, and make some things.... But... I have problems with write
on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection each invoke method is
called.
--------
DataStream messageStream = this.env.addSource(new FlinkKafkaConsumer082(properties.getProperty("topic"),
new SimpleStringSchema(), properties));
messageStream.map(new StreamingCrimeSplitter 
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());
--------
--------
public class sinkFunction
implements SinkFunction {
        private static final long serialVersionUID = 2859601213304525959L;
        @Override
        public void invoke(Tuple7 crime) throws Exception {
                System.out.println(crime.f0);
//JDBC connection
        }
}
--------
Somebody knows how I could do just one connection? I tried to do in the Constructor but the
JDBC is not serializable.
Thanks
Toletum

Mime
View raw message