apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2066) Add jdbc poller input operator
Date Wed, 18 May 2016 23:05:12 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15290044#comment-15290044
] 

ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------

Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/282#discussion_r63798454
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
    @@ -0,0 +1,592 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.db.jdbc;
    +
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +import javax.validation.constraints.Min;
    +import org.slf4j.LoggerFactory;
    +import org.apache.apex.malhar.lib.wal.WindowDataManager;
    +import org.apache.commons.lang3.tuple.MutablePair;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.DefaultPartition;
    +import com.datatorrent.api.Operator.ActivationListener;
    +import com.datatorrent.api.Operator.IdleTimeHandler;
    +import com.datatorrent.api.Partitioner;
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.lib.db.AbstractStoreInputOperator;
    +import com.datatorrent.lib.util.KeyValPair;
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Abstract operator for for consuming data using JDBC interface<br>
    + * User needs 
    + * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key
    + * <br>
    + * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
    + * <br>
    + * This operator uses static partitioning to arrive at range queries for exactly
    + * once reads<br>
    + * Assumption is that there is an ordered column using which range queries can
    + * be formed<br>
    + * If an emitColumnList is provided, please ensure that the keyColumn is the
    + * first column in the list<br>
    + * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
    + * comma separated list of the emit columns eg columnA,columnB,columnC
    + * 
    + * @displayName Jdbc Polling Input Operator
    + * @category Input
    + * @tags database, sql, jdbc, partitionable,exactlyOnce
    + */
    +public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T,
JdbcStore>
    +    implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>>
    +{
    +  /*
    +   * poll interval in milliseconds
    +   */
    +  private int pollInterval;
    +
    +  @Min(1)
    +  private int partitionCount = 1;
    +  protected transient int operatorId;
    +  protected transient boolean isReplayed;
    +  protected transient boolean isPollable;
    +  protected int batchSize;
    +  protected int fetchSize;
    +  /**
    +   * Map of windowId to <lower bound,upper bound> of the range key
    +   */
    +  protected transient MutablePair<String, String> currentWindowRecoveryState;
    +
    +  /**
    +   * size of the emit queue used to hold polled records before emit
    +   */
    +  private int queueCapacity = 4 * 1024 * 1024;
    +  private transient volatile boolean execute;
    +  private transient AtomicReference<Throwable> cause;
    +  protected transient int spinMillis;
    +  private transient OperatorContext context = null;
    +  protected String tableName;
    +  protected String key;
    +  protected transient long currentWindowId;
    +  protected KeyValPair<String, String> rangeQueryPair;
    +  protected String lower;
    +  protected String upper;
    +  protected boolean recovered;
    +  protected boolean isPolled;
    +  protected String whereCondition = null;
    +  protected String previousUpperBound;
    +  protected String highestPolled;
    +  private static final String user = "user";
    +  private static final String password = "password";
    +  /**
    +   * thread to poll database
    +   */
    +  private transient Thread dbPoller;
    +  protected transient ArrayBlockingQueue<List<T>> emitQueue;
    +  protected transient PreparedStatement ps;
    +  protected WindowDataManager windowManager;
    +  private String emitColumnList;
    +
    +  public String getWhereCondition()
    +  {
    +    return whereCondition;
    +  }
    +
    +  public void setWhereCondition(String whereCondition)
    +  {
    +    this.whereCondition = whereCondition;
    +  }
    +
    +  public String getEmitColumnList()
    +  {
    +    return emitColumnList;
    +  }
    +
    +  public void setEmitColumnList(String emitColumnList)
    +  {
    +    this.emitColumnList = emitColumnList;
    +  }
    +
    +  public int getFetchSize()
    +  {
    +    return fetchSize;
    +  }
    +
    +  public void setFetchSize(int fetchSize)
    +  {
    +    this.fetchSize = fetchSize;
    +  }
    +
    +  protected abstract void pollRecords(PreparedStatement ps);
    +
    +  public int getPollInterval()
    +  {
    +    return pollInterval;
    +  }
    +
    +  public void setPollInterval(int pollInterval)
    +  {
    +    this.pollInterval = pollInterval;
    +  }
    +
    +  public int getQueueCapacity()
    +  {
    +    return queueCapacity;
    +  }
    +
    +  public void setQueueCapacity(int queueCapacity)
    +  {
    +    this.queueCapacity = queueCapacity;
    +  }
    +
    +  public String getKey()
    +  {
    +    return key;
    +  }
    +
    +  public void setKey(String key)
    +  {
    +    this.key = key;
    +  }
    +
    +  public String getTableName()
    +  {
    +    return tableName;
    +  }
    +
    +  public void setTableName(String tableName)
    +  {
    +    this.tableName = tableName;
    +  }
    +
    +  public KeyValPair<String, String> getRangeQueryPair()
    +  {
    +    return rangeQueryPair;
    +  }
    +
    +  public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair)
    +  {
    +    this.rangeQueryPair = rangeQueryPair;
    +  }
    +
    +  public int getBatchSize()
    +  {
    +    return batchSize;
    +  }
    +
    +  public void setBatchSize(int batchSize)
    +  {
    +    this.batchSize = batchSize;
    +  }
    +
    +  public AbstractJdbcPollInputOperator()
    +  {
    +    this.pollInterval = 10 * 1000;
    +    this.fetchSize = 10000;
    +    currentWindowRecoveryState = new MutablePair<>();
    +    windowManager = new WindowDataManager.FSWindowDataManager();
    +  }
    +
    +  @Override
    +  public void setup(OperatorContext context)
    +  {
    +    super.setup(context);
    +    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    execute = true;
    +    cause = new AtomicReference<Throwable>();
    +    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    +    this.context = context;
    +    operatorId = context.getId();
    +
    +    try {
    +      if (ps == null) {
    +        //If its a range query pass upper and lower bounds
    +        //If its a polling query pass only the lower bound
    +        if (getRangeQueryPair().getValue() != null) {
    +          ps = store.getConnection().prepareStatement(
    +              JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
    +                  rangeQueryPair.getValue()),
    +              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +        } else {
    +          ps = store.getConnection().prepareStatement(
    +              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
    +              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +          isPollable = true;
    +        }
    +      }
    +    } catch (SQLException e) {
    +      LOG.error("Exception in initializing the range query for a given partition", e);
    +    }
    +
    +    windowManager.setup(context);
    +    LOG.debug("super setup done...");
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    currentWindowId = windowId;
    +
    +    isReplayed = false;
    +
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
    +      try {
    +        replay(currentWindowId);
    +      } catch (SQLException e) {
    +        LOG.error("Exception in replayed windows");
    +        throw new RuntimeException("saving recovery", e);
    --- End diff --
    
    Is it "saving recovery" or replaying old data?


> Add jdbc poller input operator
> ------------------------------
>
>                 Key: APEXMALHAR-2066
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Ashwin Chandra Putta
>            Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message