beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guillaume Balaine (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-793) JdbcIO can create a deadlock when parallelism is greater than 1
Date Thu, 21 Sep 2017 13:20:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174736#comment-16174736
] 

Guillaume Balaine commented on BEAM-793:
----------------------------------------

Alright, I made it worked by having a backoff strategy and rebuilding the batch every time.
It's likely not the best solution, but it guarantees writes, which is good for bounded pipelines.


{code:java}
      private void processRecord(T record) throws RuntimeException {
        try {
          preparedStatement.clearParameters();
          spec.getPreparedStatementSetter().setParameters(record, preparedStatement);
          preparedStatement.addBatch();
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    private void executeBatch() throws SQLException, IOException, InterruptedException {
        LOG.info("Writing bundle {} batch of {} statements", this.bundleUUID.toString(), batchCount);
        Sleeper sleeper = Sleeper.DEFAULT;
        BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
        while (true) {
          // Batch upsert entities.
          try {
            int[] updates = preparedStatement.executeBatch();
            connection.commit();
            LOG.info("Successfully wrote {} statements of bundle {}", updates.length, this.bundleUUID.toString());
            // Break if the commit threw no exception.
            break;
          } catch (SQLException exception) {
            LOG.error("Error writing bundle {} to the Database ({}): {}", this.bundleUUID.toString(),
                    exception.getErrorCode(), exception.getMessage());
            if (!BackOffUtils.next(sleeper, backoff)) {
              LOG.error("Aborting bundle {} after {} retries.", this.bundleUUID.toString(),
MAX_RETRIES);
              throw exception;
            } else {
              records.stream().forEach(this::processRecord);
            }
          }
        }
        clearBatch();
      }
     private void clearBatch() {
        batchCount = 0;
        records.clear();
      }
{code}


> JdbcIO can create a deadlock when parallelism is greater than 1
> ---------------------------------------------------------------
>
>                 Key: BEAM-793
>                 URL: https://issues.apache.org/jira/browse/BEAM-793
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>             Fix For: 2.2.0
>
>
> With the following JdbcIO configuration, if the parallelism is greater than 1, we can
have a {{Deadlock found when trying to get lock; try restarting transaction}}.
> {code}
>         MysqlDataSource dbCfg = new MysqlDataSource();
>         dbCfg.setDatabaseName("db");
>         dbCfg.setUser("user");
>         dbCfg.setPassword("pass");
>         dbCfg.setServerName("localhost");
>         dbCfg.setPortNumber(3306);
>         p.apply(Create.of(data))
>                 .apply(JdbcIO.<Tuple5<Integer, Integer, ByteString, Long, Long>>write()
>                         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dbCfg))
>                         .withStatement("INSERT INTO smth(loc,event_type,hash,begin_date,end_date)
VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE event_type=VALUES(event_type),end_date=VALUES(end_date)")
>                         .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<Tuple5<Integer,
Integer, ByteString, Long, Long>>() {
>                             public void setParameters(Tuple5<Integer, Integer, ByteString,
Long, Long> element, PreparedStatement statement)
>                                     throws Exception {
>                                 statement.setInt(1, element.f0);
>                                 statement.setInt(2, element.f1);
>                                 statement.setBytes(3, element.f2.toByteArray());
>                                 statement.setLong(4, element.f3);
>                                 statement.setLong(5, element.f4);
>                             }
>                         }));
> {code}
> This can happen due to the {{autocommit}}. I'm going to investigate.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message