apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
Date Tue, 09 Aug 2016 08:17:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413181#comment-15413181
] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74016084
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -286,271 +146,233 @@ public AbstractJdbcPollInputOperator()
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    intializeDSLContext();
    +    if (scanService == null) {
    +      scanService = Executors.newScheduledThreadPool(1);
    +    }
         execute = true;
    -    cause = new AtomicReference<Throwable>();
    -    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    -    this.context = context;
    +    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
         operatorId = context.getId();
    +    windowManager.setup(context);
    +  }
     
    -    try {
    +  private void intializeDSLContext()
    +  {
    +    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
    +  }
    +
    +  @Override
    +  public void activate(OperatorContext context)
    +  {
    +    initializePreparedStatement();
    +    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
    +    if (largestRecoveryWindow == Stateless.WINDOW_ID
    +        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow)
{
    +      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    +    }
    +  }
     
    -      //If its a range query pass upper and lower bounds
    -      //If its a polling query pass only the lower bound
    -      if (getRangeQueryPair().getValue() != null) {
    -        ps = store.getConnection()
    -            .prepareStatement(
    -                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
    -                    rangeQueryPair.getValue()),
    -                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +  protected void initializePreparedStatement()
    +  {
    +    try {
    +      // If its a range query pass upper and lower bounds, If its a polling query pass
only the lower bound
    +      if (isPollerPartition) {
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(),
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } else {
             ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        isPollable = true;
    +            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           }
    -
         } catch (SQLException e) {
           LOG.error("Exception in initializing the range query for a given partition", e);
           throw new RuntimeException(e);
         }
     
    -    windowManager.setup(context);
    -    LOG.debug("super setup done...");
       }
     
       @Override
       public void beginWindow(long windowId)
       {
         currentWindowId = windowId;
    -
    -    isReplayed = false;
    -
         if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           try {
             replay(currentWindowId);
    +        return;
           } catch (SQLException e) {
             LOG.error("Exception in replayed windows", e);
             throw new RuntimeException(e);
           }
         }
    -
    -    if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow())
{
    -      try {
    -        if (!isPollable && rangeQueryPair.getValue() != null) {
    -
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
    -                  rangeQueryPair.getValue()),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        } else {
    -          String bound = null;
    -          if (previousUpperBound == null) {
    -            bound = getRangeQueryPair().getKey();
    -          } else {
    -            bound = previousUpperBound;
    -          }
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -          isPollable = true;
    -        }
    -        isReplayed = false;
    -        LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
    -      } catch (SQLException e) {
    -        // TODO Auto-generated catch block
    -        throw new RuntimeException(e);
    -      }
    +    if (isPollerPartition) {
    +      updatePollQuery();
    +      isPolled = false;
         }
    +    lowerBound = lastEmittedRecord;
    +  }
     
    -    //Reset the pollable query with the updated upper and lower bounds
    -    if (isPollable) {
    +  private void updatePollQuery()
    +  {
    +    if ((lastPolledBound != lastEmittedRecord)) {
    +      if (lastEmittedRecord == null) {
    +        lastPolledBound = rangeQueryPair.getKey();
    +      } else {
    +        lastPolledBound = lastEmittedRecord;
    +      }
           try {
    -        String bound = null;
    -        if (previousUpperBound == null && highestPolled == null) {
    -          bound = getRangeQueryPair().getKey();
    -        } else {
    -          bound = highestPolled;
    -        }
    -        ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
    -        isPolled = false;
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledBound,
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } catch (SQLException e) {
             throw new RuntimeException(e);
           }
         }
     
    -    lower = null;
    -    upper = null;
    -
    -    //Check if a thread is already active and start only if its no
    -    //Do not start the thread from setup, will conflict with the replay
    -    if (dbPoller == null && !isReplayed) {
    -      //If this is not a replayed state, reset the ps to highest read offset + 1, 
    -      //keep the upper bound as the one that was initialized after static partitioning
    -      LOG.info("Statement when re-initialized {}", ps.toString());
    -      dbPoller = new Thread(new DBPoller());
    -      dbPoller.start();
    -    }
       }
     
       @Override
       public void emitTuples()
       {
    -    if (isReplayed) {
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           return;
         }
    -
    -    List<T> tuples;
    -
    -    if ((tuples = emitQueue.poll()) != null) {
    -      for (Object tuple : tuples) {
    -        if (lower == null) {
    -          lower = tuple.toString();
    -        }
    -        upper = tuple.toString();
    -        outputPort.emit((T)tuple);
    +    int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
    +    while (pollSize-- > 0) {
    +      T obj = emitQueue.poll();
    +      if (obj != null) {
    --- End diff --
    
    As we are checking queuesize before entering the while loop, there is rare chance that
"obj" will be null. I had added the check just for safety.


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message