apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhupeshchawda <...@git.apache.org>
Subject [GitHub] apex-malhar pull request #358: APEXMALHAR-2172: Updates to JDBC Poll Input O...
Date Tue, 09 Aug 2016 06:32:22 GMT
Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74004337
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -286,271 +146,233 @@ public AbstractJdbcPollInputOperator()
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    intializeDSLContext();
    +    if (scanService == null) {
    +      scanService = Executors.newScheduledThreadPool(1);
    +    }
         execute = true;
    -    cause = new AtomicReference<Throwable>();
    -    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    -    this.context = context;
    +    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
         operatorId = context.getId();
    +    windowManager.setup(context);
    +  }
     
    -    try {
    +  private void intializeDSLContext()
    +  {
    +    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
    +  }
    +
    +  @Override
    +  public void activate(OperatorContext context)
    +  {
    +    initializePreparedStatement();
    +    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
    +    if (largestRecoveryWindow == Stateless.WINDOW_ID
    +        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow)
{
    +      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    +    }
    +  }
     
    -      //If its a range query pass upper and lower bounds
    -      //If its a polling query pass only the lower bound
    -      if (getRangeQueryPair().getValue() != null) {
    -        ps = store.getConnection()
    -            .prepareStatement(
    -                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
    -                    rangeQueryPair.getValue()),
    -                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +  protected void initializePreparedStatement()
    +  {
    +    try {
    +      // If its a range query pass upper and lower bounds, If its a polling query pass
only the lower bound
    +      if (isPollerPartition) {
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(),
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } else {
             ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        isPollable = true;
    +            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           }
    -
         } catch (SQLException e) {
           LOG.error("Exception in initializing the range query for a given partition", e);
           throw new RuntimeException(e);
         }
     
    -    windowManager.setup(context);
    -    LOG.debug("super setup done...");
       }
     
       @Override
       public void beginWindow(long windowId)
       {
         currentWindowId = windowId;
    -
    -    isReplayed = false;
    -
         if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           try {
             replay(currentWindowId);
    +        return;
           } catch (SQLException e) {
             LOG.error("Exception in replayed windows", e);
             throw new RuntimeException(e);
           }
         }
    -
    -    if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow())
{
    -      try {
    -        if (!isPollable && rangeQueryPair.getValue() != null) {
    -
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
    -                  rangeQueryPair.getValue()),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        } else {
    -          String bound = null;
    -          if (previousUpperBound == null) {
    -            bound = getRangeQueryPair().getKey();
    -          } else {
    -            bound = previousUpperBound;
    -          }
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -          isPollable = true;
    -        }
    -        isReplayed = false;
    -        LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
    -      } catch (SQLException e) {
    -        // TODO Auto-generated catch block
    -        throw new RuntimeException(e);
    -      }
    +    if (isPollerPartition) {
    +      updatePollQuery();
    +      isPolled = false;
         }
    +    lowerBound = lastEmittedRecord;
    +  }
     
    -    //Reset the pollable query with the updated upper and lower bounds
    -    if (isPollable) {
    +  private void updatePollQuery()
    +  {
    +    if ((lastPolledBound != lastEmittedRecord)) {
    +      if (lastEmittedRecord == null) {
    +        lastPolledBound = rangeQueryPair.getKey();
    +      } else {
    +        lastPolledBound = lastEmittedRecord;
    +      }
           try {
    -        String bound = null;
    -        if (previousUpperBound == null && highestPolled == null) {
    -          bound = getRangeQueryPair().getKey();
    -        } else {
    -          bound = highestPolled;
    -        }
    -        ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
    -        isPolled = false;
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledBound,
Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } catch (SQLException e) {
             throw new RuntimeException(e);
           }
         }
     
    -    lower = null;
    -    upper = null;
    -
    -    //Check if a thread is already active and start only if its no
    -    //Do not start the thread from setup, will conflict with the replay
    -    if (dbPoller == null && !isReplayed) {
    -      //If this is not a replayed state, reset the ps to highest read offset + 1, 
    -      //keep the upper bound as the one that was initialized after static partitioning
    -      LOG.info("Statement when re-initialized {}", ps.toString());
    -      dbPoller = new Thread(new DBPoller());
    -      dbPoller.start();
    -    }
       }
     
       @Override
       public void emitTuples()
       {
    -    if (isReplayed) {
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           return;
         }
    -
    -    List<T> tuples;
    -
    -    if ((tuples = emitQueue.poll()) != null) {
    -      for (Object tuple : tuples) {
    -        if (lower == null) {
    -          lower = tuple.toString();
    -        }
    -        upper = tuple.toString();
    -        outputPort.emit((T)tuple);
    +    int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
    +    while (pollSize-- > 0) {
    +      T obj = emitQueue.poll();
    +      if (obj != null) {
    +        emitTuple(obj);
           }
    +      lastEmittedRecord++;
         }
       }
     
    +  protected void emitTuple(T obj)
    +  {
    +    outputPort.emit(obj);
    +  }
    +
       @Override
       public void endWindow()
       {
         try {
           if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
    -        if (currentWindowRecoveryState == null) {
    -          currentWindowRecoveryState = new MutablePair<String, String>();
    -        }
    -        if (lower != null && upper != null) {
    -          previousUpperBound = upper;
    -          isPolled = true;
    -        }
    -        MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower,
upper);
    -        currentWindowRecoveryState = windowBoundaryPair;
    +        currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRecord);
             windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
           }
         } catch (IOException e) {
           throw new RuntimeException("saving recovery", e);
         }
    -    currentWindowRecoveryState = new MutablePair<>();
    -  }
    -
    -  public int getPartitionCount()
    -  {
    -    return partitionCount;
    +    if (threadException != null) {
    +      store.disconnect();
    +      DTThrowable.rethrow(threadException.get());
    +    }
       }
     
    -  public void setPartitionCount(int partitionCount)
    +  @Override
    +  public void deactivate()
       {
    -    this.partitionCount = partitionCount;
    +    scanService.shutdownNow();
    +    store.disconnect();
       }
     
    -  @Override
    -  public void activate(Context cntxt)
    +  protected void pollRecords()
       {
    -    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
    -        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow())
{
    -      // If it is a replay state, don't start any threads here
    +    if (isPolled) {
           return;
         }
    -  }
    -
    -  @Override
    -  public void deactivate()
    -  {
         try {
    -      if (dbPoller != null && dbPoller.isAlive()) {
    -        dbPoller.interrupt();
    -        dbPoller.join();
    +      ps.setFetchSize(getFetchSize());
    +      ResultSet result = ps.executeQuery();
    +      if (result.next()) {
    +        do {
    +          emitQueue.offer(getTuple(result));
    --- End diff --
    
    Here, we need to check if ```offer()``` returns ```false```. If so, keep trying (after
sleeping for a while) until it returns success.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message