apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2172) Update JDBC poll input operator to fix issues
Date Mon, 08 Aug 2016 06:21:20 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411374#comment-15411374
] 

ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r73826686
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -565,67 +379,110 @@ public void emitReplayedTuples(PreparedStatement ps)
        */
       @Override
       public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
definePartitions(
    -      Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>>
partitions,
    -      com.datatorrent.api.Partitioner.PartitioningContext context)
    +      Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions,
PartitioningContext context)
       {
         List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions
= new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
             getPartitionCount());
    -    JdbcStore jdbcStore = new JdbcStore();
    -    jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
    -    jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
    -    jdbcStore.setConnectionProperties(store.getConnectionProperties());
     
    -    jdbcStore.connect();
    -
    -    HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null;
    +    HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap =
null;
         try {
    -      partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
    -          jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(),
getKey(),
    -          store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password),
    -          whereCondition, emitColumnList);
    +      store.connect();
    +      intializeDSLContext();
    +      partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
         } catch (SQLException e) {
           LOG.error("Exception in initializing the partition range", e);
    +      throw new RuntimeException(e);
    +    } finally {
    +      store.disconnect();
         }
     
         KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
     
    +    // The n given partitions are for range queries and n + 1 partition is for polling
query
         for (int i = 0; i <= getPartitionCount(); i++) {
    -      AbstractJdbcPollInputOperator<T> jdbcPoller = null;
    -
    -      jdbcPoller = cloneUtils.getClone();
    -
    -      jdbcPoller.setStore(store);
    -      jdbcPoller.setKey(getKey());
    -      jdbcPoller.setPartitionCount(getPartitionCount());
    -      jdbcPoller.setPollInterval(getPollInterval());
    -      jdbcPoller.setTableName(getTableName());
    -      jdbcPoller.setBatchSize(getBatchSize());
    -      jdbcPoller.setEmitColumnList(getEmitColumnList());
    -
    -      store.connect();
    -      //The n given partitions are for range queries and n + 1 partition is for polling
query
    -      //The upper bound for the n+1 partition is set to null since its a pollable partition
    +      AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
           if (i < getPartitionCount()) {
    -        jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
    -        isPollable = false;
    +        jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
    +        jdbcPoller.lastEmittedRecord = partitionToRangeMap.get(i).getKey();
    --- End diff --
    
    Set ```jdbcPoller.isPollerPartition = false``` here?


> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
>                 Key: APEXMALHAR-2172
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message