apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
Date Fri, 05 Aug 2016 06:45:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408992#comment-15408992
] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73648435
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -286,149 +146,119 @@ public AbstractJdbcPollInputOperator()
       public void setup(OperatorContext context)
       {
         super.setup(context);
    +    intializeDSLContext();
    +    if (scanService == null) {
    +      scanService = Executors.newScheduledThreadPool(partitionCount);
    +    }
         spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
         execute = true;
         cause = new AtomicReference<Throwable>();
    -    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    -    this.context = context;
    +    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
         operatorId = context.getId();
    +    windowManager.setup(context);
    +  }
     
    -    try {
    +  private void intializeDSLContext()
    +  {
    +    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
    +  }
     
    -      //If its a range query pass upper and lower bounds
    -      //If its a polling query pass only the lower bound
    -      if (getRangeQueryPair().getValue() != null) {
    -        ps = store.getConnection()
    -            .prepareStatement(
    -                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
    -                    rangeQueryPair.getValue()),
    -                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +  @Override
    +  public void activate(OperatorContext context)
    +  {
    +    initializePreparedStatement();
    +    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
    +    if (largestRecoveryWindow == Stateless.WINDOW_ID
    +        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow)
{
    +      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    +    }
    +  }
    +
    +  protected void initializePreparedStatement()
    +  {
    +    try {
    +      // If its a range query pass upper and lower bounds, If its a polling query pass
only the lower bound
    +      if (isPollerPartition) {
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(),
Integer.MAX_VALUE),
    --- End diff --
    
    rangeQueryPair is calculated during "definePartition" and use is not suppose to set it.
Let me try to make that part of code more readable.


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message