apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
Date Mon, 08 Aug 2016 08:29:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411480#comment-15411480
] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73838593
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -438,119 +261,110 @@ public void endWindow()
         currentWindowRecoveryState = new MutablePair<>();
       }
     
    -  public int getPartitionCount()
    -  {
    -    return partitionCount;
    -  }
    -
    -  public void setPartitionCount(int partitionCount)
    +  @Override
    +  public void deactivate()
       {
    -    this.partitionCount = partitionCount;
    +    scanService.shutdownNow();
    +    store.disconnect();
       }
     
    -  @Override
    -  public void activate(Context cntxt)
    +  protected void pollRecords()
       {
    -    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
    -        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow())
{
    -      // If it is a replay state, don't start any threads here
    +    if (isPolled) {
           return;
         }
    -  }
    -
    -  @Override
    -  public void deactivate()
    -  {
         try {
    -      if (dbPoller != null && dbPoller.isAlive()) {
    -        dbPoller.interrupt();
    -        dbPoller.join();
    +      ps.setFetchSize(getFetchSize());
    +      ResultSet result = ps.executeQuery();
    +      if (result.next()) {
    +        do {
    +          emitQueue.add(getTuple(result));
    +        } while (result.next());
           }
    -    } catch (InterruptedException ex) {
    -      // log and ignore, ending execution anyway
    -      LOG.error("exception in poller thread: ", ex);
    +      isPolled = true;
    +    } catch (SQLException ex) {
    +      throw new RuntimeException(String.format("Error while running query"), ex);
    +    } finally {
    +      store.disconnect();
         }
    -  }
     
    -  @Override
    -  public void handleIdleTime()
    -  {
    -    if (execute) {
    -      try {
    -        Thread.sleep(spinMillis);
    -      } catch (InterruptedException ie) {
    -        throw new RuntimeException(ie);
    -      }
    -    } else {
    -      LOG.error("Exception: ", cause);
    -      DTThrowable.rethrow(cause.get());
    -    }
       }
     
    +  public abstract T getTuple(ResultSet result);
    +
       protected void replay(long windowId) throws SQLException
       {
    -    isReplayed = true;
     
    -    MutablePair<String, String> recoveredData = new MutablePair<String, String>();
         try {
    -      recoveredData = (MutablePair<String, String>)windowManager.load(operatorId,
windowId);
    +      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.load(operatorId,
    +          windowId);
     
    -      if (recoveredData != null) {
    -        //skip the window and return if there was no incoming data in the window
    -        if (recoveredData.left == null || recoveredData.right == null) {
    -          return;
    -        }
    -
    -        if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound))
{
    -          LOG.info("Matched so returning");
    -          return;
    -        }
    +      if (recoveredData != null && shouldReplayWindow(recoveredData)) {
    +        LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
    +            recoveredData.right);
     
    -        JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
    -        jdbcPoller.setStore(store);
    -        jdbcPoller.setKey(getKey());
    -        jdbcPoller.setPartitionCount(getPartitionCount());
    -        jdbcPoller.setPollInterval(getPollInterval());
    -        jdbcPoller.setTableName(getTableName());
    -        jdbcPoller.setBatchSize(getBatchSize());
    -        isPollable = false;
    -
    -        LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right
+ "]");
    +        ps = store.getConnection().prepareStatement(
    +            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
    +        LOG.info("Query formed to recover data - {}", ps.toString());
     
    -        jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left,
recoveredData.right));
    +        emitReplayedTuples(ps);
     
    -        jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(),
    -                jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString());
    +      }
     
    -        emitReplayedTuples(jdbcPoller.ps);
    +      if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
    +        try {
    +          if (!isPollerPartition && rangeQueryPair.getValue() != null) {
    +            ps = store.getConnection().prepareStatement(
    +                buildRangeQuery(lastEmittedRecord, (rangeQueryPair.getValue() - lastEmittedRecord)),
TYPE_FORWARD_ONLY,
    +                CONCUR_READ_ONLY);
    +          } else {
    +            Integer bound = null;
    +            if (lastEmittedRecord == null) {
    +              bound = rangeQueryPair.getKey();
    +            } else {
    +              bound = lastEmittedRecord;
    +            }
    +            ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE),
TYPE_FORWARD_ONLY,
    +                CONCUR_READ_ONLY);
    +          }
    +          scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    --- End diff --
    
    This is already being done in the ```activate()``` method.
    I don't think there is a need for this entire if() block: ```if (currentWindowId == windowManager.getLargestRecoveryWindow())```.
This is already taken care by before the if() statement.


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message