apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
Date Mon, 08 Aug 2016 06:46:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411406#comment-15411406
] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73828487
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -80,201 +91,45 @@
      * @tags database, sql, jdbc, partitionable,exactlyOnce
      */
     @Evolving
    -public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T,
JdbcStore>
    -    implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>>
    +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T,
JdbcStore> implements
    +    ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
     {
    -  /**
    -   * poll interval in milliseconds
    -   */
    -  private static int pollInterval = 10000;
    +  private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024;
    +  private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
    +  private static int DEFAULT_FETCH_SIZE = 20000;
    +  private static int DEFAULT_BATCH_SIZE = 2000;
    +  private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
    +  private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
    +  private int fetchSize = DEFAULT_FETCH_SIZE;
     
       @Min(1)
       private int partitionCount = 1;
    -  protected transient int operatorId;
    -  protected transient boolean isReplayed;
    -  protected transient boolean isPollable;
    -  protected int batchSize;
    -  protected static int fetchSize = 20000;
    -  /**
    -   * Map of windowId to <lower bound,upper bound> of the range key
    -   */
    -  protected transient MutablePair<String, String> currentWindowRecoveryState;
    -
    -  /**
    -   * size of the emit queue used to hold polled records before emit
    -   */
    -  private static int queueCapacity = 4 * 1024 * 1024;
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  @NotNull
    +  private String tableName;
    +  @NotNull
    +  private String columnsExpression;
    +  @NotNull
    +  private String key;
    +  private String whereCondition = null;
    +  private long currentWindowId;
    +  private WindowDataManager windowManager;
    +
    +  protected KeyValPair<Integer, Integer> rangeQueryPair;
    +  protected Integer lowerBound;
    +  private transient int operatorId;
    +  private transient DSLContext create;
       private transient volatile boolean execute;
    -  private transient AtomicReference<Throwable> cause;
    -  protected transient int spinMillis;
    -  private transient OperatorContext context;
    -  protected String tableName;
    -  protected String key;
    -  protected long currentWindowId;
    -  protected KeyValPair<String, String> rangeQueryPair;
    -  protected String lower;
    -  protected String upper;
    -  protected boolean recovered;
    -  protected boolean isPolled;
    -  protected String whereCondition = null;
    -  protected String previousUpperBound;
    -  protected String highestPolled;
    -  private static final String user = "";
    -  private static final String password = "";
    -  /**
    -   * thread to poll database
    -   */
    -  private transient Thread dbPoller;
    -  protected transient ArrayBlockingQueue<List<T>> emitQueue;
    +  private transient ScheduledExecutorService scanService;
    +  protected transient boolean isPolled;
    +  protected transient Integer lastPolledBound;
    --- End diff --
    
    ```upperBound```? Seems to be more intuitive when looking at the rest of the code.


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message