If by task you mean job then yes global static variables initialized im the main of the job do not get serialized/transfered to the nodes where that job may get assigned. 

The other thing is also since it is a sink, the sink will be serialized to that node and then initialized so that static variable will be local to that sink.

Someone from flink should chime in :p

On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <java.dev.mtl@gmail.com> wrote:
Usually the database connection pool is thread safe. When you mean task you mean a single deployed flink job?

I still think a sink is only init once. You can prove it by putting logging in the open and close.

On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin.ma@gmail.com> wrote:
Thanks, John. If I don't static my pool, I think it will create one instance for each task. If the pool is static, each jvm can hold one instance. Depending on the deployment approach, it can create one to multiple instances. Is this correct?
Konstantin's talk mentions static variables can lead to dead locks, etc, I don't know if the loss of jdbc connection is also related to this. Btw, I am using JDBC to write to HBase, maybe it also matters.

On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev.mtl@gmail.com> wrote:
Xin. The open() close() cycle of a Sink function is only called once so I don't think you event need to have it static your pool. Someone can confirm this?

Miki the JDBC Connector lacks some functionality for instance it only flushes batches when the batch interval is reached. So if you set batch interval to 5 and you get 6 records the 6 one will not be flushed to the DB until you get another 4. You can see in the code above Xin has put a timer based flush as well. Also JDBC connector does not have checkpointing if you ever need that, which is a surprise because most JDBC databases have transactions so it would be nice to have.

On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5054@gmail.com> wrote:
If it's a sink that use jdbc, why not using the flink Jdbcsink connector?

On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin.ma@gmail.com> wrote:
I have watched one of the recent Flink forward videos, Apache Flink Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions that we should avoid using static variables to share state between tasks.

So should I also avoid static database connection? Because I am facing a weird issue currently, the database connection will lose at some point and bring the whole job down.

I have created a database tool like this, 

public class Phoenix {

    private static ComboPooledDataSource dataSource = new ComboPooledDataSource();
    static {
        try {
            dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", "org.apache.phoenix.jdbc.PhoenixDriver"));
            dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", null));
            Properties properties = new Properties();
            properties.setProperty("user", "---");
            properties.setProperty("password", "---");
        } catch (PropertyVetoException e) {
            throw new RuntimeException("phoenix datasource conf error");

    private static Connection getConn() throws SQLException {
        return dataSource.getConnection();

    public static < T > T executeQuery(String sql, Caller < T > caller) throws SQLException {
        // .. execiton logic

    public static int executeUpdateWithTx(List < String > sqlList) throws SQLException {
        // ..update logic


Then I implemented my customized sink function like this,

public class CustomizedSink extends RichSinkFunction < Record > {

    private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger");
    private static final int batchInsertSize = 5000;
    private static final long flushInterval = 60 * 1000 L;
    private long lastFlushTime;
    private BatchCommit batchCommit;
    private ConcurrentLinkedQueue < Object > cacheQueue;
    private ExecutorService threadPool;

    public void open(Configuration parameters) throws Exception {
        cacheQueue = new ConcurrentLinkedQueue < > ();
        threadPool = Executors.newFixedThreadPool(1);
        batchCommit = new BatchCommit();

    public void invoke(DriverLbs driverLbs) throws Exception {
        if (cacheQueue.size() >= batchInsertSize ||
            System.currentTimeMillis() - lastFlushTime >= flushInterval) {
            lastFlushTime = System.currentTimeMillis();

    private class BatchCommit implements Runnable {
        public void run() {
            try {
                int ct;
                synchronized(cacheQueue) {
                    List < String > sqlList = Lists.newArrayList();
                    for (ct = 0; ct < batchInsertSize; ct++) {
                        Object obj = cacheQueue.poll();
                        if (obj == null) {
                        sqlList.add(generateBatchSql((Record) obj));
                LOG.info("Batch insert " + ct + " cache records");
            } catch (Exception e) {
                LOG.error("Batch insert error: ", e);

        private String generateBatchSql(Record record) {
            // business logic

Is there any good idea to refactor the codes?