apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhupeshchawda <...@git.apache.org>
Subject [GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC...
Date Mon, 08 Aug 2016 09:07:01 GMT
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73843244
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -286,149 +141,117 @@ public AbstractJdbcPollInputOperator()
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    intializeDSLContext();
    +    if (scanService == null) {
    +      scanService = Executors.newScheduledThreadPool(1);
    +    }
         execute = true;
    -    cause = new AtomicReference<Throwable>();
    -    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    -    this.context = context;
    +    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
         operatorId = context.getId();
    +    windowManager.setup(context);
    +  }
     
    -    try {
    +  private void intializeDSLContext()
    +  {
    +    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
    +  }
     
    -      //If its a range query pass upper and lower bounds
    -      //If its a polling query pass only the lower bound
    -      if (getRangeQueryPair().getValue() != null) {
    -        ps = store.getConnection()
    -            .prepareStatement(
    -                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
    -                    rangeQueryPair.getValue()),
    -                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +  @Override
    +  public void activate(OperatorContext context)
    +  {
    +    initializePreparedStatement();
    +    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
    +    if (largestRecoveryWindow == Stateless.WINDOW_ID
    +        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow)
{
    +      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    +    }
    +  }
    +
    +  protected void initializePreparedStatement()
    +  {
    +    try {
    +      // If its a range query pass upper and lower bounds, If its a polling query pass
only the lower bound
    +      if (isPollerPartition) {
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(),
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } else {
             ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        isPollable = true;
    +            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           }
    -
         } catch (SQLException e) {
           LOG.error("Exception in initializing the range query for a given partition", e);
           throw new RuntimeException(e);
         }
     
    -    windowManager.setup(context);
    -    LOG.debug("super setup done...");
       }
     
       @Override
       public void beginWindow(long windowId)
       {
         currentWindowId = windowId;
    -
    -    isReplayed = false;
    -
         if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           try {
             replay(currentWindowId);
    +        return;
           } catch (SQLException e) {
             LOG.error("Exception in replayed windows", e);
             throw new RuntimeException(e);
           }
         }
    -
    -    if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow())
{
    -      try {
    -        if (!isPollable && rangeQueryPair.getValue() != null) {
    -
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
    -                  rangeQueryPair.getValue()),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        } else {
    -          String bound = null;
    -          if (previousUpperBound == null) {
    -            bound = getRangeQueryPair().getKey();
    -          } else {
    -            bound = previousUpperBound;
    -          }
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -          isPollable = true;
    -        }
    -        isReplayed = false;
    -        LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
    -      } catch (SQLException e) {
    -        // TODO Auto-generated catch block
    -        throw new RuntimeException(e);
    -      }
    +    if (isPollerPartition) {
    +      updatePollQuery();
    +      isPolled = false;
         }
    +    lowerBound = lastEmittedRecord;
    +  }
     
    -    //Reset the pollable query with the updated upper and lower bounds
    -    if (isPollable) {
    +  private void updatePollQuery()
    +  {
    +    if ((lastPolledBound != lastEmittedRecord)) {
    +      if (lastEmittedRecord == null) {
    +        lastPolledBound = rangeQueryPair.getKey();
    +      } else {
    +        lastPolledBound = lastEmittedRecord;
    +      }
           try {
    -        String bound = null;
    -        if (previousUpperBound == null && highestPolled == null) {
    -          bound = getRangeQueryPair().getKey();
    -        } else {
    -          bound = highestPolled;
    -        }
    -        ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
    -        isPolled = false;
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledBound,
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } catch (SQLException e) {
             throw new RuntimeException(e);
           }
         }
     
    -    lower = null;
    -    upper = null;
    -
    -    //Check if a thread is already active and start only if its no
    -    //Do not start the thread from setup, will conflict with the replay
    -    if (dbPoller == null && !isReplayed) {
    -      //If this is not a replayed state, reset the ps to highest read offset + 1, 
    -      //keep the upper bound as the one that was initialized after static partitioning
    -      LOG.info("Statement when re-initialized {}", ps.toString());
    -      dbPoller = new Thread(new DBPoller());
    -      dbPoller.start();
    -    }
       }
     
       @Override
       public void emitTuples()
       {
    -    if (isReplayed) {
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           return;
         }
    -
    -    List<T> tuples;
    -
    -    if ((tuples = emitQueue.poll()) != null) {
    -      for (Object tuple : tuples) {
    -        if (lower == null) {
    -          lower = tuple.toString();
    -        }
    -        upper = tuple.toString();
    -        outputPort.emit((T)tuple);
    -      }
    +    int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
    +    while (pollSize-- > 0) {
    +      T obj = emitQueue.poll();
    --- End diff --
    
    Check for ```obj == null```, in case the queue becomes empty 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message