beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Work logged] (BEAM-3500) JdbcIO: Improve connection management
Date Fri, 09 Mar 2018 05:19:00 GMT


ASF GitHub Bot logged work on BEAM-3500:

                Author: ASF GitHub Bot
            Created on: 09/Mar/18 05:18
            Start Date: 09/Mar/18 05:18
    Worklog Time Spent: 10m 
      Work Description: jbonofre commented on a change in pull request #4461: [BEAM-3500]
"Attach" JDBC connection to the bundle and add DataSourceFactory allowing full control of
the way the DataSource is created

 File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/
 @@ -327,12 +428,47 @@ DataSource buildDatasource() throws Exception{
         if (getConnectionProperties() != null && getConnectionProperties().get()
!= null) {
-        return basicDataSource;
+        current = basicDataSource;
+      // wrapping the datasource as a pooling datasource
+      DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current);
+      PoolableConnectionFactory poolableConnectionFactory =
+              new PoolableConnectionFactory(connectionFactory, null);
+      GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+      poolConfig.setMaxTotal(getPoolMaxTotal());
+      poolConfig.setBlockWhenExhausted(getPoolBlockWhenExhausted());
+      poolConfig.setMaxWaitMillis(getPoolMaxWaitMillis());
+      poolConfig.setMaxIdle(getPoolMaxIdle());
+      poolConfig.setMinIdle(getPoolMinIdle());
+      poolConfig.setTestOnBorrow(getPoolTestOnBorrow());
+      poolConfig.setTestOnReturn(getPoolTestOnReturn());
+      poolConfig.setNumTestsPerEvictionRun(getPoolNumTestsPerEvictionRun());
+      poolConfig.setMinEvictableIdleTimeMillis(getPoolMinEvictableIdleTimeMillis());
+      poolConfig.setTestWhileIdle(getPoolTestWhileIdle());
+      poolConfig.setSoftMinEvictableIdleTimeMillis(getPoolSoftMinEvictableIdleTimeMillis());
+      poolConfig.setLifo(getPoolLifo());
+      GenericObjectPool connectionPool =
+              new GenericObjectPool(poolableConnectionFactory, poolConfig);
+      poolableConnectionFactory.setPool(connectionPool);
+      poolableConnectionFactory.setValidationQuery("SELECT 1 FROM DUAL");
 Review comment:
   My bad, I forgot to update with the provided value. By default, the `validationQuery` should
be null and the user can define it depending of his database.

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

Issue Time Tracking

    Worklog Id:     (was: 78788)
    Time Spent: 2h 20m  (was: 2h 10m)

> JdbcIO: Improve connection management
> -------------------------------------
>                 Key: BEAM-3500
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-jdbc
>    Affects Versions: 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
> JdbcIO write DoFn acquires connection in {{@Setup}} and release it in {{@Teardown}} methods,
which means that connection might stay opened for days in streaming job case. Keeping single
connection open for so long might be very risky as it's exposed to database, network etc issues.
> *Taking connection from the pool when it is actually needed*
> I suggest that connection would be taken from the connection pool in {{executeBatch}}
method and released when the batch is flushed. This will allow the pool to take care of any
returned unhealthy connections etc.
> *Make JdbcIO accept data source factory*
>  It would be nice if JdbcIO accepted DataSourceFactory rather than DataSource itself.
I am saying that because sink checks if DataSource implements `Serializable` interface, which
make it impossible to pass BasicDataSource(used internally by sink) as it doesn’t implement
this interface. Something like:
> {code:java}
> interface DataSourceFactory extends Serializable{
>      DataSource create();
> }
> {code}

This message was sent by Atlassian JIRA

View raw message