apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
Date Thu, 09 Jun 2016 00:24:21 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321694#comment-15321694
] 

ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/282#discussion_r66365162
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
    @@ -0,0 +1,344 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.db.jdbc;
    +
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.HashMap;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.lib.util.KeyValPair;
    +
    +/**
    + * A utility class used to retrieve the metadata for a given unique key of a SQL
    + * table. This class would emit range queries based on a primary index given
    + * 
    + * @Input - dbName,tableName, primaryKey
    + * @Output - map<operatorId,prepared statement>
    + *
    + */
    +public class JdbcMetaDataUtility
    +{
    +  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
    +  private static String DB_CONNECTION = "";
    +  private static String DB_USER = "";
    +  private static String DB_PASSWORD = "";
    +  private static String TABLE_NAME = "";
    +  private static String KEY_COLUMN = "";
    +  private static String WHERE_CLAUSE = null;
    +  private static String COLUMN_LIST = null;
    +
    +  private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class);
    +
    +  public JdbcMetaDataUtility()
    +  {
    +
    +  }
    +
    +  public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String
userName, String password)
    +  {
    +    DB_CONNECTION = dbConnection;
    +    DB_USER = userName;
    +    DB_PASSWORD = password;
    +    TABLE_NAME = tableName;
    +    KEY_COLUMN = key;
    +  }
    +
    +  private static Connection getDBConnection()
    +  {
    +
    +    Connection dbConnection = null;
    +
    +    try {
    +      Class.forName(DB_DRIVER);
    +    } catch (ClassNotFoundException e) {
    +      LOG.error("Driver not found", e);
    +    }
    +
    +    try {
    +      dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
    +      return dbConnection;
    +    } catch (SQLException e) {
    +      LOG.error("Exception in getting connection handle", e);
    +    }
    +
    +    return dbConnection;
    +
    +  }
    +
    +  private static String generateQueryString()
    +  {
    +    StringBuilder sb = new StringBuilder();
    +    sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
    +
    +    if (WHERE_CLAUSE != null) {
    +      sb.append(" WHERE " + WHERE_CLAUSE);
    +    }
    +
    +    return sb.toString();
    +  }
    +
    +  /**
    +   * Finds the total number of rows in the table
    +   */
    +  private static long getRecordRange(String query) throws SQLException
    +  {
    +    long rowCount = 0;
    +    Connection dbConnection = null;
    +    PreparedStatement preparedStatement = null;
    +
    +    try {
    +      dbConnection = getDBConnection();
    +      preparedStatement = dbConnection.prepareStatement(query);
    +
    +      ResultSet rs = preparedStatement.executeQuery();
    +
    +      while (rs.next()) {
    +        rowCount = Long.parseLong(rs.getString("RowCount"));
    +        LOG.info("# Rows - " + rowCount);
    +      }
    +
    +    } catch (SQLException e) {
    +      LOG.error("Exception in retreiving result set", e);
    +    } finally {
    +      if (preparedStatement != null) {
    +        preparedStatement.close();
    +      }
    +      if (dbConnection != null) {
    +        dbConnection.close();
    +      }
    +    }
    +    return rowCount;
    +  }
    +
    +  /**
    +   * Returns a pair of <upper,lower> bounds for each partition of the
    +   * {@link JdbcPollInputOperator}}
    +   */
    +  private static KeyValPair<String, String> getQueryBounds(long lower, long upper)
throws SQLException
    +  {
    +    Connection dbConnectionLower = null;
    +    Connection dbConnectionUpper = null;
    +    PreparedStatement psLowerBound = null;
    +    PreparedStatement psUpperBound = null;
    +
    +    StringBuilder lowerBound = new StringBuilder();
    +    StringBuilder upperBound = new StringBuilder();
    +
    +    KeyValPair<String, String> boundedQUeryPair = null;
    +
    +    try {
    +      dbConnectionLower = getDBConnection();
    +      dbConnectionUpper = getDBConnection();
    +
    +      /*
    +       * Run this loop only for n-1 partitions.
    +       * By default the last partition will have fewer records, since we are rounding
off
    +       * */
    +
    +      lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
    +      upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
    +
    +      if (WHERE_CLAUSE != null) {
    +        lowerBound.append(" WHERE " + WHERE_CLAUSE);
    +        upperBound.append(" WHERE " + WHERE_CLAUSE);
    +      }
    +
    +      lowerBound.append(" LIMIT " + (0 + lower) + ",1");
    --- End diff --
    
    LIMIT clause might not work with all databases. Need to document this.


> Add jdbc poller input operator
> ------------------------------
>
>                 Key: APEXMALHAR-2066
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Ashwin Chandra Putta
>            Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be given.
> 3.This operator uses static partitioning to arrive at range queries for exactly once
reads
> 4.Assumption is that there is an ordered column using which range queries can be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the first column
in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma separated list
of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the FSWindowDataManager
- (<lowerBound,UpperBound>,operatorId,windowId).This (lowerBound,upperBoundPair) is
then used for recovery.The queries are constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL table.
This class would emit range queries based on a primary index given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message