apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhup...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2066 JdbcPolling, idempotent, partitionable
Date Fri, 15 Jul 2016 06:04:58 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 132012823 -> aaa4464f0


APEXMALHAR-2066 JdbcPolling,idempotent,partitionable


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4c7d268d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4c7d268d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4c7d268d

Branch: refs/heads/master
Commit: 4c7d268dbcdb6adbe84daadac6787c4d2f6b203d
Parents: 9b62506
Author: devtagare <devtagare@gmail.com>
Authored: Wed May 18 15:25:56 2016 -0700
Committer: devtagare <devtagare@gmail.com>
Committed: Thu Jul 14 11:42:36 2016 -0700

----------------------------------------------------------------------
 .../db/jdbc/AbstractJdbcPollInputOperator.java  | 661 +++++++++++++++++++
 .../lib/db/jdbc/JdbcMetaDataUtility.java        | 353 ++++++++++
 .../lib/db/jdbc/JdbcPollInputOperator.java      | 231 +++++++
 .../datatorrent/lib/db/jdbc/JdbcPollerTest.java | 246 +++++++
 4 files changed, 1491 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
new file mode 100644
index 0000000..ab12ed3
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -0,0 +1,661 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Abstract operator for for consuming data using JDBC interface<br>
+ * User needs User needs to provide
+ * tableName,dbConnection,setEmitColumnList,look-up key <br>
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
+ * <br>
+ * This operator uses static partitioning to arrive at range queries for exactly
+ * once reads<br>
+ * This operator will create a configured number of non-polling static
+ * partitions for fetching the existing data in the table. And an additional
+ * single partition for polling additive data. Assumption is that there is an
+ * ordered column using which range queries can be formed<br>
+ * If an emitColumnList is provided, please ensure that the keyColumn is the
+ * first column in the list<br>
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC<br>
+ * Only newly added data which has increasing ids will be fetched by the polling
+ * jdbc partition
+ * 
+ * In the next iterations this operator would support an in-clause for
+ * idempotency instead of having only range query support to support non ordered
+ * key columns
+ * 
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc, partitionable,exactlyOnce
+ */
+@Evolving
+public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore>
+    implements ActivationListener<Context>, IdleTimeHandler, Partitioner<AbstractJdbcPollInputOperator<T>>
+{
+  /**
+   * poll interval in milliseconds
+   */
+  private static int pollInterval = 10000;
+
+  @Min(1)
+  private int partitionCount = 1;
+  protected transient int operatorId;
+  protected transient boolean isReplayed;
+  protected transient boolean isPollable;
+  protected int batchSize;
+  protected static int fetchSize = 20000;
+  /**
+   * Map of windowId to <lower bound,upper bound> of the range key
+   */
+  protected transient MutablePair<String, String> currentWindowRecoveryState;
+
+  /**
+   * size of the emit queue used to hold polled records before emit
+   */
+  private static int queueCapacity = 4 * 1024 * 1024;
+  private transient volatile boolean execute;
+  private transient AtomicReference<Throwable> cause;
+  protected transient int spinMillis;
+  private transient OperatorContext context;
+  protected String tableName;
+  protected String key;
+  protected long currentWindowId;
+  protected KeyValPair<String, String> rangeQueryPair;
+  protected String lower;
+  protected String upper;
+  protected boolean recovered;
+  protected boolean isPolled;
+  protected String whereCondition = null;
+  protected String previousUpperBound;
+  protected String highestPolled;
+  private static final String user = "";
+  private static final String password = "";
+  /**
+   * thread to poll database
+   */
+  private transient Thread dbPoller;
+  protected transient ArrayBlockingQueue<List<T>> emitQueue;
+  protected transient PreparedStatement ps;
+  protected WindowDataManager windowManager;
+  private String emitColumnList;
+
+  /**
+   * Returns the where clause
+   */
+  public String getWhereCondition()
+  {
+    return whereCondition;
+  }
+
+  /**
+   * Sets the where clause
+   */
+  public void setWhereCondition(String whereCondition)
+  {
+    this.whereCondition = whereCondition;
+  }
+
+  /**
+   * Returns the list of columns to select from the query
+   */
+  public String getEmitColumnList()
+  {
+    return emitColumnList;
+  }
+
+  /**
+   * Comma separated list of columns to select from the given table
+   */
+  public void setEmitColumnList(String emitColumnList)
+  {
+    this.emitColumnList = emitColumnList;
+  }
+
+  /**
+   * Returns the fetchsize for getting the results
+   */
+  public int getFetchSize()
+  {
+    return fetchSize;
+  }
+
+  /**
+   * Sets the fetchsize for getting the results
+   */
+  public void setFetchSize(int fetchSize)
+  {
+    this.fetchSize = fetchSize;
+  }
+
+  protected abstract void pollRecords(PreparedStatement ps);
+
+  /**
+   * Returns the interval for polling the queue
+   */
+  public int getPollInterval()
+  {
+    return pollInterval;
+  }
+
+  /**
+   * Sets the interval for polling the emit queue
+   */
+  public void setPollInterval(int pollInterval)
+  {
+    this.pollInterval = pollInterval;
+  }
+
+  /**
+   * Returns the capacity of the emit queue
+   */
+  public int getQueueCapacity()
+  {
+    return queueCapacity;
+  }
+
+  /**
+   * Sets the capacity of the emit queue
+   */
+  public void setQueueCapacity(int queueCapacity)
+  {
+    this.queueCapacity = queueCapacity;
+  }
+
+  /**
+   * Returns the ordered key used to generate the range queries
+   */
+  public String getKey()
+  {
+    return key;
+  }
+
+  /**
+   * Sets the ordered key used to generate the range queries
+   */
+  public void setKey(String key)
+  {
+    this.key = key;
+  }
+
+  /**
+   * Returns the tableName which would be queried
+   */
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  /**
+   * Sets the tableName to query
+   */
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  /**
+   * Returns rangeQueryPair - <lowerBound,upperBound>
+   */
+  public KeyValPair<String, String> getRangeQueryPair()
+  {
+    return rangeQueryPair;
+  }
+
+  /**
+   * Sets the rangeQueryPair <lowerBound,upperBound>
+   */
+  public void setRangeQueryPair(KeyValPair<String, String> rangeQueryPair)
+  {
+    this.rangeQueryPair = rangeQueryPair;
+  }
+
+  /**
+   * Returns batchSize indicating the number of elements in emitQueue
+   */
+  public int getBatchSize()
+  {
+    return batchSize;
+  }
+
+  /**
+   * Sets batchSize for number of elements in the emitQueue
+   */
+  public void setBatchSize(int batchSize)
+  {
+    this.batchSize = batchSize;
+  }
+
+  public AbstractJdbcPollInputOperator()
+  {
+    currentWindowRecoveryState = new MutablePair<>();
+    windowManager = new FSWindowDataManager();
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    execute = true;
+    cause = new AtomicReference<Throwable>();
+    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
+    this.context = context;
+    operatorId = context.getId();
+
+    try {
+
+      //If its a range query pass upper and lower bounds
+      //If its a polling query pass only the lower bound
+      if (getRangeQueryPair().getValue() != null) {
+        ps = store.getConnection()
+            .prepareStatement(
+                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
+                    rangeQueryPair.getValue()),
+                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+      } else {
+        ps = store.getConnection().prepareStatement(
+            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
+            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+        isPollable = true;
+      }
+
+    } catch (SQLException e) {
+      LOG.error("Exception in initializing the range query for a given partition", e);
+      throw new RuntimeException(e);
+    }
+
+    windowManager.setup(context);
+    LOG.debug("super setup done...");
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+
+    isReplayed = false;
+
+    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
+      try {
+        replay(currentWindowId);
+      } catch (SQLException e) {
+        LOG.error("Exception in replayed windows", e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) {
+      try {
+        if (!isPollable && rangeQueryPair.getValue() != null) {
+
+          ps = store.getConnection().prepareStatement(
+              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
+                  rangeQueryPair.getValue()),
+              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+        } else {
+          String bound = null;
+          if (previousUpperBound == null) {
+            bound = getRangeQueryPair().getKey();
+          } else {
+            bound = previousUpperBound;
+          }
+          ps = store.getConnection().prepareStatement(
+              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
+              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+          isPollable = true;
+        }
+        isReplayed = false;
+        LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
+      } catch (SQLException e) {
+        // TODO Auto-generated catch block
+        throw new RuntimeException(e);
+      }
+    }
+
+    //Reset the pollable query with the updated upper and lower bounds
+    if (isPollable) {
+      try {
+        String bound = null;
+        if (previousUpperBound == null && highestPolled == null) {
+          bound = getRangeQueryPair().getKey();
+        } else {
+          bound = highestPolled;
+        }
+        ps = store.getConnection().prepareStatement(
+            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
+            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
+        isPolled = false;
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    lower = null;
+    upper = null;
+
+    //Check if a thread is already active and start only if its no
+    //Do not start the thread from setup, will conflict with the replay
+    if (dbPoller == null && !isReplayed) {
+      //If this is not a replayed state, reset the ps to highest read offset + 1, 
+      //keep the upper bound as the one that was initialized after static partitioning
+      LOG.info("Statement when re-initialized {}", ps.toString());
+      dbPoller = new Thread(new DBPoller());
+      dbPoller.start();
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (isReplayed) {
+      return;
+    }
+
+    List<T> tuples;
+
+    if ((tuples = emitQueue.poll()) != null) {
+      for (Object tuple : tuples) {
+        if (lower == null) {
+          lower = tuple.toString();
+        }
+        upper = tuple.toString();
+        outputPort.emit((T)tuple);
+      }
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    try {
+      if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
+        if (currentWindowRecoveryState == null) {
+          currentWindowRecoveryState = new MutablePair<String, String>();
+        }
+        if (lower != null && upper != null) {
+          previousUpperBound = upper;
+          isPolled = true;
+        }
+        MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper);
+        currentWindowRecoveryState = windowBoundaryPair;
+        windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("saving recovery", e);
+    }
+    currentWindowRecoveryState = new MutablePair<>();
+  }
+
+  public int getPartitionCount()
+  {
+    return partitionCount;
+  }
+
+  public void setPartitionCount(int partitionCount)
+  {
+    this.partitionCount = partitionCount;
+  }
+
+  @Override
+  public void activate(Context cntxt)
+  {
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
+        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) {
+      // If it is a replay state, don't start any threads here
+      return;
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+    try {
+      if (dbPoller != null && dbPoller.isAlive()) {
+        dbPoller.interrupt();
+        dbPoller.join();
+      }
+    } catch (InterruptedException ex) {
+      // log and ignore, ending execution anyway
+      LOG.error("exception in poller thread: ", ex);
+    }
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    if (execute) {
+      try {
+        Thread.sleep(spinMillis);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      }
+    } else {
+      LOG.error("Exception: ", cause);
+      DTThrowable.rethrow(cause.get());
+    }
+  }
+
+  protected void replay(long windowId) throws SQLException
+  {
+    isReplayed = true;
+
+    MutablePair<String, String> recoveredData = new MutablePair<String, String>();
+    try {
+      recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId);
+
+      if (recoveredData != null) {
+        //skip the window and return if there was no incoming data in the window
+        if (recoveredData.left == null || recoveredData.right == null) {
+          return;
+        }
+
+        if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) {
+          LOG.info("Matched so returning");
+          return;
+        }
+
+        JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
+        jdbcPoller.setStore(store);
+        jdbcPoller.setKey(getKey());
+        jdbcPoller.setPartitionCount(getPartitionCount());
+        jdbcPoller.setPollInterval(getPollInterval());
+        jdbcPoller.setTableName(getTableName());
+        jdbcPoller.setBatchSize(getBatchSize());
+        isPollable = false;
+
+        LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]");
+
+        jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right));
+
+        jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
+            JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(),
+                jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()),
+            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
+        LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString());
+
+        emitReplayedTuples(jdbcPoller.ps);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("replay", e);
+    }
+
+  }
+
+  /**
+   * Replays the tuples in sync mode for replayed windows
+   */
+  public void emitReplayedTuples(PreparedStatement ps)
+  {
+    LOG.debug("Emitting replayed statement is -" + ps.toString());
+    ResultSet rs = null;
+    try (PreparedStatement pStat = ps;) {
+      pStat.setFetchSize(getFetchSize());
+      LOG.debug("sql query = {}", pStat);
+      rs = pStat.executeQuery();
+      if (rs == null || rs.isClosed()) {
+        LOG.debug("Nothing to replay");
+        return;
+      }
+      while (rs.next()) {
+        previousUpperBound = rs.getObject(getKey()).toString();
+        outputPort.emit((T)rs.getObject(getKey()));
+      }
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * Uses a static partitioning scheme to initialize operator partitions with
+   * non-overlapping key ranges to read In addition to 'n' partitions, 'n+1'
+   * partition is a polling partition which reads the records beyond the given
+   * range
+   */
+  @Override
+  public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
+      Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions,
+      com.datatorrent.api.Partitioner.PartitioningContext context)
+  {
+    List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
+        getPartitionCount());
+    JdbcStore jdbcStore = new JdbcStore();
+    jdbcStore.setDatabaseDriver(store.getDatabaseDriver());
+    jdbcStore.setDatabaseUrl(store.getDatabaseUrl());
+    jdbcStore.setConnectionProperties(store.getConnectionProperties());
+
+    jdbcStore.connect();
+
+    HashMap<Integer, KeyValPair<String, String>> partitionToRangeMap = null;
+    try {
+      partitionToRangeMap = JdbcMetaDataUtility.getPartitionedQueryMap(getPartitionCount(),
+          jdbcStore.getDatabaseDriver(), jdbcStore.getDatabaseUrl(), getTableName(), getKey(),
+          store.getConnectionProperties().getProperty(user), store.getConnectionProperties().getProperty(password),
+          whereCondition, emitColumnList);
+    } catch (SQLException e) {
+      LOG.error("Exception in initializing the partition range", e);
+    }
+
+    KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+
+    for (int i = 0; i <= getPartitionCount(); i++) {
+      AbstractJdbcPollInputOperator<T> jdbcPoller = null;
+
+      jdbcPoller = cloneUtils.getClone();
+
+      jdbcPoller.setStore(store);
+      jdbcPoller.setKey(getKey());
+      jdbcPoller.setPartitionCount(getPartitionCount());
+      jdbcPoller.setPollInterval(getPollInterval());
+      jdbcPoller.setTableName(getTableName());
+      jdbcPoller.setBatchSize(getBatchSize());
+      jdbcPoller.setEmitColumnList(getEmitColumnList());
+
+      store.connect();
+      //The n given partitions are for range queries and n + 1 partition is for polling query
+      //The upper bound for the n+1 partition is set to null since its a pollable partition
+      if (i < getPartitionCount()) {
+        jdbcPoller.setRangeQueryPair(partitionToRangeMap.get(i));
+        isPollable = false;
+      } else {
+        jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(partitionToRangeMap.get(i - 1).getValue(), null));
+        isPollable = true;
+      }
+      Partition<AbstractJdbcPollInputOperator<T>> po = new DefaultPartition<AbstractJdbcPollInputOperator<T>>(
+          jdbcPoller);
+      newPartitions.add(po);
+    }
+
+    previousUpperBound = null;
+    return newPartitions;
+  }
+
+  @Override
+  public void partitioned(
+      Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions)
+  {
+    //Nothing to implement here
+  }
+
+  /**
+   * This class polls a store that can be queried with a JDBC interface The
+   * preparedStatement is updated as more rows are read
+   */
+  public class DBPoller implements Runnable
+  {
+    @Override
+    public void run()
+    {
+      while (execute) {
+        try {
+          long startTs = System.currentTimeMillis();
+          if ((isPollable && !isPolled) || !isPollable) {
+            pollRecords(ps);
+          }
+          long endTs = System.currentTimeMillis();
+          long ioTime = endTs - startTs;
+          long sleepTime = pollInterval - ioTime;
+          LOG.debug("pollInterval = {} , I/O time = {} , sleepTime = {}", pollInterval, ioTime, sleepTime);
+          Thread.sleep(sleepTime > 0 ? sleepTime : 0);
+        } catch (Exception ex) {
+          cause.set(ex);
+          execute = false;
+        }
+      }
+    }
+  }
+
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
new file mode 100644
index 0000000..9ba353c
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of a SQL
+ * table. This class would emit range queries based on a primary index given
+ * 
+ * LIMIT clause may not work with all databases or may not return the same
+ * results always<br>
+ * 
+ * This utility has been tested with MySQL and where clause is supported<br>
+ * 
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map<operatorId,prepared statement>
+ *
+ */
+@Evolving
+public class JdbcMetaDataUtility
+{
+  private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+  private static String DB_CONNECTION = "";
+  private static String DB_USER = "";
+  private static String DB_PASSWORD = "";
+  private static String TABLE_NAME = "";
+  private static String KEY_COLUMN = "";
+  private static String WHERE_CLAUSE = null;
+  private static String COLUMN_LIST = null;
+
+  private static Logger LOG = LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+  public JdbcMetaDataUtility()
+  {
+
+  }
+
+  public JdbcMetaDataUtility(String dbConnection, String tableName, String key, String userName, String password)
+  {
+    DB_CONNECTION = dbConnection;
+    DB_USER = userName;
+    DB_PASSWORD = password;
+    TABLE_NAME = tableName;
+    KEY_COLUMN = key;
+  }
+
+  /**
+   * Returns the database connection handle
+   * */
+  private static Connection getDBConnection()
+  {
+
+    Connection dbConnection = null;
+
+    try {
+      Class.forName(DB_DRIVER);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Driver not found", e);
+      throw new RuntimeException(e);
+    }
+
+    try {
+      dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+      return dbConnection;
+    } catch (SQLException e) {
+      LOG.error("Exception in getting connection handle", e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  private static String generateQueryString()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+    if (WHERE_CLAUSE != null) {
+      sb.append(" WHERE " + WHERE_CLAUSE);
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Finds the total number of rows in the table
+   */
+  private static long getRecordRange(String query) throws SQLException
+  {
+    long rowCount = 0;
+    Connection dbConnection = null;
+    PreparedStatement preparedStatement = null;
+
+    try {
+      dbConnection = getDBConnection();
+      preparedStatement = dbConnection.prepareStatement(query);
+
+      ResultSet rs = preparedStatement.executeQuery();
+
+      while (rs.next()) {
+        rowCount = Long.parseLong(rs.getString(1));
+        LOG.debug("# Rows - " + rowCount);
+      }
+
+    } catch (SQLException e) {
+      LOG.error("Exception in retreiving result set", e);
+      throw new RuntimeException(e);
+    } finally {
+      if (preparedStatement != null) {
+        preparedStatement.close();
+      }
+      if (dbConnection != null) {
+        dbConnection.close();
+      }
+    }
+    return rowCount;
+  }
+
+  /**
+   * Returns a pair of <upper,lower> bounds for each partition of the
+   * {@link JdbcPollInputOperator}}
+   */
+  private static KeyValPair<String, String> getQueryBounds(long lower, long upper) throws SQLException
+  {
+    Connection dbConnection = null;
+    PreparedStatement psLowerBound = null;
+    PreparedStatement psUpperBound = null;
+
+    StringBuilder lowerBound = new StringBuilder();
+    StringBuilder upperBound = new StringBuilder();
+
+    KeyValPair<String, String> boundedQUeryPair = null;
+
+    try {
+      dbConnection = getDBConnection();
+
+      /*
+       * Run this loop only for n-1 partitions.
+       * By default the last partition will have fewer records, since we are rounding off
+       * */
+
+      lowerBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
+      upperBound.append("SELECT " + KEY_COLUMN + " FROM " + TABLE_NAME);
+
+      if (WHERE_CLAUSE != null) {
+        lowerBound.append(" WHERE " + WHERE_CLAUSE);
+        upperBound.append(" WHERE " + WHERE_CLAUSE);
+      }
+
+      lowerBound.append(" LIMIT " + (0 + lower) + ",1");
+      upperBound.append(" LIMIT " + (upper - 1) + ",1");
+
+      psLowerBound = dbConnection.prepareStatement(lowerBound.toString());
+      psUpperBound = dbConnection.prepareStatement(upperBound.toString());
+
+      ResultSet rsLower = psLowerBound.executeQuery();
+      ResultSet rsUpper = psUpperBound.executeQuery();
+
+      String lowerVal = null;
+      String upperVal = null;
+
+      while (rsLower.next()) {
+        lowerVal = rsLower.getString(KEY_COLUMN);
+      }
+
+      while (rsUpper.next()) {
+        upperVal = rsUpper.getString(KEY_COLUMN);
+      }
+
+      boundedQUeryPair = new KeyValPair<String, String>(lowerVal, upperVal);
+
+    } catch (SQLException e) {
+      LOG.error("Exception in getting bounds for queries");
+      throw new RuntimeException(e);
+    } finally {
+      if (psLowerBound != null) {
+        psLowerBound.close();
+      }
+      if (psUpperBound != null) {
+        psUpperBound.close();
+      }
+      if (dbConnection != null) {
+        dbConnection.close();
+      }
+    }
+    return boundedQUeryPair;
+
+  }
+
+  /**
+   * Returns a map of partitionId to a KeyValPair of <lower,upper> of the query
+   * range<br>
+   * Ensures even distribution of records across partitions except the last
+   * partition By default the last partition for batch queries will have fewer
+   * records, since we are rounding off
+   * 
+   * @throws SQLException
+   * 
+   */
+  private static HashMap<Integer, KeyValPair<String, String>> getRangeQueries(int numberOfPartitions, int events,
+      long rowCount) throws SQLException
+  {
+    HashMap<Integer, KeyValPair<String, String>> partitionToQueryMap = new HashMap<Integer, KeyValPair<String, String>>();
+    for (int i = 0, lowerOffset = 0, upperOffset = events; i < numberOfPartitions
+        - 1; i++, lowerOffset += events, upperOffset += events) {
+
+      partitionToQueryMap.put(i, getQueryBounds(lowerOffset, upperOffset));
+    }
+
+    //Call to construct the lower and upper bounds for the last partition
+    partitionToQueryMap.put(numberOfPartitions - 1, getQueryBounds(events * (numberOfPartitions - 1), rowCount));
+
+    LOG.info("Partition map - " + partitionToQueryMap.toString());
+
+    return partitionToQueryMap;
+  }
+
+  /**
+   * Helper function returns a range query based on the bounds passed<br>
+   * Invoked from the setup method of {@link - AbstractJdbcPollInputOperator} to
+   * initialize the preparedStatement in the given operator<br>
+   * Optional whereClause for conditional selections Optional columnList for
+   * projection
+   */
+  public static String buildRangeQuery(String tableName, String keyColumn, String lower, String upper)
+  {
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    if (COLUMN_LIST != null) {
+      sb.append(COLUMN_LIST);
+    } else {
+      sb.append("*");
+    }
+    sb.append(" from " + tableName + " WHERE ");
+    if (WHERE_CLAUSE != null) {
+      sb.append(WHERE_CLAUSE + " AND ");
+    }
+    sb.append(keyColumn + " BETWEEN '" + lower + "' AND '" + upper + "'");
+    return sb.toString();
+  }
+
+  /**
+   * Helper function that constructs a query from the next highest key after an
+   * operator is restarted
+   */
+  public static String buildGTRangeQuery(String tableName, String keyColumn, String lower, String upper)
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    if (COLUMN_LIST != null) {
+      sb.append(COLUMN_LIST);
+    } else {
+      sb.append("*");
+    }
+    sb.append(" from " + tableName + " WHERE ");
+    if (WHERE_CLAUSE != null) {
+      sb.append(WHERE_CLAUSE + " AND ");
+    }
+    sb.append(keyColumn + " > '" + lower + "' AND " + keyColumn + " <= '" + upper + "'");
+    return sb.toString();
+  }
+
+  /**
+   * Helper function that constructs a query for polling from outside the given
+   * range
+   */
+  public static String buildPollableQuery(String tableName, String keyColumn, String lower)
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    if (COLUMN_LIST != null) {
+      sb.append(COLUMN_LIST);
+    } else {
+      sb.append("*");
+    }
+    sb.append(" from " + tableName + " WHERE ");
+    if (WHERE_CLAUSE != null) {
+      sb.append(WHERE_CLAUSE + " AND ");
+    }
+    sb.append(keyColumn + " > '" + lower + "' ");
+    return sb.toString();
+  }
+
+  /**
+   * Called by the partitioner from {@link - AbstractJdbcPollInputOperator}<br>
+   * Finds the range of query per partition<br>
+   * Returns a map of partitionId to PreparedStatement based on the range
+   * computed
+   * 
+   * @throws SQLException
+   */
+  public static HashMap<Integer, KeyValPair<String, String>> getPartitionedQueryMap(int partitions, String dbDriver,
+      String dbConnection, String tableName, String key, String userName, String password, String whereClause,
+      String columnList) throws SQLException
+  {
+    long rowCount = 0L;
+    try {
+      DB_CONNECTION = dbConnection;
+      DB_USER = userName;
+      DB_PASSWORD = password;
+      TABLE_NAME = tableName;
+      KEY_COLUMN = key;
+      WHERE_CLAUSE = whereClause;
+      COLUMN_LIST = columnList;
+      DB_DRIVER = dbDriver;
+      rowCount = getRecordRange(generateQueryString());
+    } catch (SQLException e) {
+      LOG.error("Exception in getting the record range", e);
+    }
+    return getRangeQueries(partitions, getOffset(rowCount, partitions), rowCount);
+  }
+
+  /**
+   * Returns the rounded offset to arrive at a range query
+   */
+  private static int getOffset(long rowCount, int partitions)
+  {
+    if (rowCount % partitions == 0) {
+      return (int)(rowCount / partitions);
+    } else {
+      return (int)((rowCount - (rowCount % (partitions - 1))) / (partitions - 1));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
new file mode 100644
index 0000000..45f96bf
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+/**
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
+ * consuming data from MySQL using JDBC interface <br>
+ * User needs to provide tableName,dbConnection,setEmitColumnList,look-up key
+ * <br>
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be given
+ * <br>
+ * This operator uses static partitioning to arrive at range queries for exactly
+ * once reads<br>
+ * Assumption is that there is an ordered column using which range queries can
+ * be formed<br>
+ * If an emitColumnList is provided, please ensure that the keyColumn is the
+ * first column in the list<br>
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ * 
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class JdbcPollInputOperator extends AbstractJdbcPollInputOperator<Object>
+{
+  private long lastBatchWindowId;
+  private transient long currentWindowId;
+  private long lastCreationTsMillis;
+  private long fetchBackMillis = 0L;
+  private ArrayList<String> emitColumns;
+  private transient int count = 0;
+
+  /**
+   * Returns the emit columns
+   */
+  public List<String> getEmitColumns()
+  {
+    return emitColumns;
+  }
+
+  /**
+   * Sets the emit columns
+   */
+  public void setEmitColumns(ArrayList<String> emitColumns)
+  {
+    this.emitColumns = emitColumns;
+  }
+
+  /**
+   * Returns fetchBackMilis
+   */
+  public long getFetchBackMillis()
+  {
+    return fetchBackMillis;
+  }
+
+  /**
+   * Sets fetchBackMilis - used in polling
+   */
+  public void setFetchBackMillis(long fetchBackMillis)
+  {
+    this.fetchBackMillis = fetchBackMillis;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    parseEmitColumnList(getEmitColumnList());
+    lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
+  }
+
+  private void parseEmitColumnList(String columnList)
+  {
+    String[] cols = columnList.split(",");
+    ArrayList<String> arr = Lists.newArrayList();
+    for (int i = 0; i < cols.length; i++) {
+      arr.add(cols[i]);
+    }
+    setEmitColumns(arr);
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+    super.beginWindow(l);
+    currentWindowId = l;
+  }
+
+  @Override
+  protected void pollRecords(PreparedStatement ps)
+  {
+    ResultSet rs = null;
+    List<Object> metaList = new ArrayList<>();
+
+    if (isReplayed) {
+      return;
+    }
+
+    try {
+      if (ps.isClosed()) {
+        LOG.debug("Returning due to closed ps for non-pollable partitions");
+        return;
+      }
+    } catch (SQLException e) {
+      LOG.error("Prepared statement is closed", e);
+      throw new RuntimeException(e);
+    }
+
+    try (PreparedStatement pStat = ps;) {
+      pStat.setFetchSize(getFetchSize());
+      LOG.debug("sql query = {}", pStat);
+      rs = pStat.executeQuery();
+      boolean hasNext = false;
+
+      if (rs == null || rs.isClosed()) {
+        return;
+      }
+
+      while ((hasNext = rs.next())) {
+        Object key = null;
+        StringBuilder resultTuple = new StringBuilder();
+        try {
+          if (count < getBatchSize()) {
+            key = rs.getObject(getKey());
+            for (String obj : emitColumns) {
+              resultTuple.append(rs.getObject(obj) + ",");
+            }
+            metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
+            count++;
+          } else {
+            emitQueue.add(metaList);
+            metaList = new ArrayList<>();
+            key = rs.getObject(getKey());
+            for (String obj : emitColumns) {
+              resultTuple.append(rs.getObject(obj) + ",");
+            }
+            metaList.add(resultTuple.substring(0, resultTuple.length() - 1));
+            count = 0;
+          }
+        } catch (NullPointerException npe) {
+          LOG.error("Key not found" + npe);
+          throw new RuntimeException(npe);
+        }
+        if (isPollable) {
+          highestPolled = key.toString();
+          isPolled = true;
+        }
+      }
+      /*Flush the remaining records once the result set is over and batch-size is not reached,
+       * Dont flush if its pollable*/
+      if (!hasNext) {
+        if ((isPollable && isPolled) || !isPollable) {
+          emitQueue.offer(metaList);
+          metaList = new ArrayList<>();
+          count = 0;
+        }
+        if (!isPolled) {
+          isPolled = true;
+        }
+      }
+      LOG.debug("last creation time stamp = {}", lastCreationTsMillis);
+    } catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (isReplayed) {
+      LOG.debug(
+          "Calling emit tuples during window - " + currentWindowId + "::" + windowManager.getLargestRecoveryWindow());
+      LOG.debug("Returning for replayed window");
+      return;
+    }
+
+    List<Object> tuples;
+
+    if (lastBatchWindowId < currentWindowId) {
+      if ((tuples = emitQueue.poll()) != null) {
+        for (Object tuple : tuples) {
+          String[] str = tuple.toString().split(",");
+          if (lower == null) {
+            lower = str[0];
+          }
+          upper = str[0];
+          outputPort.emit(tuple);
+        }
+        lastBatchWindowId = currentWindowId;
+      }
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPollInputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4c7d268d/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
new file mode 100644
index 0000000..573e45d
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link AbstractJdbcPollInputOperator} and
+ * {@link JdbcPollInputOperator}
+ */
+public class JdbcPollerTest
+{
+  public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+  private static final String TABLE_NAME = "test_account_table";
+  private static String APP_ID = "JdbcPollingOperatorTest";
+  public String dir = null;
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      cleanup();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)";
+      stmt.executeUpdate(createTable);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      FileUtils.deleteDirectory(new File("target/" + APP_ID));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void cleanTable()
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+      String cleanTable = "delete from " + TABLE_NAME;
+      stmt.executeUpdate(cleanTable);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void insertEventsInTable(int numEvents, int offset)
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+      PreparedStatement stmt = con.prepareStatement(insert);
+      for (int i = 0; i < numEvents; i++, offset++) {
+        stmt.setInt(1, offset);
+        stmt.setString(2, "Account_Holder-" + offset);
+        stmt.setInt(3, (offset * 1000));
+        stmt.executeUpdate();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Simulates actual application flow Adds a batch query partitiom, a pollable
+   * partition Incremental record polling is also checked
+   */
+  @Test
+  public void testJdbcPollingInputOperatorBatch() throws InterruptedException
+  {
+    cleanTable();
+    insertEventsInTable(10, 0);
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    this.dir = "target/" + APP_ID + "/";
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+    JdbcPollInputOperator inputOperator = new JdbcPollInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setBatchSize(100);
+    inputOperator.setPollInterval(1000);
+    inputOperator.setEmitColumnList("Account_No,Name,Amount");
+    inputOperator.setKey("Account_No");
+    inputOperator.setTableName(TABLE_NAME);
+    inputOperator.setFetchSize(100);
+    inputOperator.setPartitionCount(1);
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<>();
+    inputOperator.outputPort.setSink(sink);
+
+    TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2);
+
+    DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition = new DefaultPartition<AbstractJdbcPollInputOperator<Object>>(
+        inputOperator);
+
+    TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>> pseudoParttion = new TestUtils.MockPartition<>(
+        apartition, readerStats);
+
+    List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newMocks = Lists.newArrayList();
+
+    newMocks.add(pseudoParttion);
+
+    Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> newPartitions = inputOperator
+        .definePartitions(newMocks, null);
+
+    Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>> itr = newPartitions
+        .iterator();
+
+    int operatorId = 0;
+    for (com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>> partition : newPartitions) {
+
+      Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+      this.dir = "target/" + APP_ID + "/";
+      partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+      partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+      OperatorContextTestHelper.TestIdOperatorContext partitioningContext = new OperatorContextTestHelper.TestIdOperatorContext(
+          operatorId++, partitionAttributeMap);
+
+      partition.getPartitionedInstance().setup(partitioningContext);
+      partition.getPartitionedInstance().activate(partitioningContext);
+    }
+
+    //First partition is for range queries,last is for polling queries
+    AbstractJdbcPollInputOperator<Object> newInstance = itr.next().getPartitionedInstance();
+    CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
+    newInstance.outputPort.setSink(sink1);
+    newInstance.beginWindow(1);
+    Thread.sleep(50);
+    newInstance.emitTuples();
+    newInstance.endWindow();
+
+    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+    int i = 0;
+    for (Object tuple : sink1.collectedTuples) {
+      String[] pojoEvent = tuple.toString().split(",");
+      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false);
+      i++;
+    }
+    sink1.collectedTuples.clear();
+
+    insertEventsInTable(10, 10);
+
+    AbstractJdbcPollInputOperator<Object> pollableInstance = itr.next().getPartitionedInstance();
+
+    pollableInstance.outputPort.setSink(sink1);
+
+    pollableInstance.beginWindow(1);
+    Thread.sleep(pollableInstance.getPollInterval());
+    pollableInstance.emitTuples();
+    pollableInstance.endWindow();
+    
+
+    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+    i = 10;
+    for (Object tuple : sink1.collectedTuples) {
+      String[] pojoEvent = tuple.toString().split(",");
+      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false);
+      i++;
+    }
+
+    sink1.collectedTuples.clear();
+    insertEventsInTable(10, 20);
+
+    pollableInstance.beginWindow(2);
+    Thread.sleep(pollableInstance.getPollInterval());
+    pollableInstance.emitTuples();
+    pollableInstance.endWindow();
+    
+    Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+
+    i = 20;
+    for (Object tuple : sink1.collectedTuples) {
+      String[] pojoEvent = tuple.toString().split(",");
+      Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ? true : false);
+      i++;
+    }
+    sink1.collectedTuples.clear();
+  }
+
+}


Mime
View raw message