apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject [1/2] incubator-apex-malhar git commit: JdbcPOJOInputOperator polling fix
Date Thu, 02 Jun 2016 08:44:32 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 78c5fad19 -> ef12eb0cf


JdbcPOJOInputOperator polling fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4fbc0387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4fbc0387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4fbc0387

Branch: refs/heads/master
Commit: 4fbc0387d06e14977261590d295d720e67e83e7e
Parents: 9c11400
Author: Sandeep Deshmukh <sandeep@datatorrent.com>
Authored: Wed May 25 19:37:32 2016 +0530
Committer: Sandeep Deshmukh <sandeep@datatorrent.com>
Committed: Wed May 25 19:37:32 2016 +0530

----------------------------------------------------------------------
 .../lib/db/jdbc/JdbcPOJOInputOperator.java      |  8 ++----
 .../lib/db/jdbc/JdbcOperatorTest.java           | 29 ++++++++++++++++----
 2 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4fbc0387/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
index db2d27a..2e0993f 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -95,8 +95,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
   private transient PreparedStatement preparedStatement;
   protected transient Class<?> pojoClass;
 
-  protected int pageNumber;
-
   @AutoMetric
   protected long tuplesRead;
 
@@ -188,7 +186,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
   public void beginWindow(long l)
   {
     windowDone = false;
-    tuplesRead = 0;
   }
 
   @Override
@@ -209,7 +206,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
           windowDone = true;
         }
         resultSet.close();
-        pageNumber++;
       } catch (SQLException ex) {
         store.disconnect();
         throw new RuntimeException(ex);
@@ -220,9 +216,9 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object>
   protected void setRuntimeParams() throws SQLException
   {
     if (mysqlSyntax) {
-      preparedStatement.setLong(1, pageNumber * fetchSize);
+      preparedStatement.setLong(1, tuplesRead);
     } else {
-      preparedStatement.setLong(1, pageNumber * fetchSize);
+      preparedStatement.setLong(1, tuplesRead);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4fbc0387/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 1fef903..6f2688f 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -411,7 +411,7 @@ public class JdbcOperatorTest
     OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
         OPERATOR_ID, attributeMap);
     
-    cleanTableAndInsertEvents(10);
+    insertEvents(10,true, 0);
 
     JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator();
     inputOperator.setStore(store);
@@ -475,21 +475,40 @@ public class JdbcOperatorTest
     inputOperator.endWindow();
 
     Assert.assertEquals("rows from db", 0, sink.collectedTuples.size());
+    
+    // Insert 3 more tuples and check if they are read successfully.
+    insertEvents(3, false, 10);
+
+    inputOperator.beginWindow(3);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 3, sink.collectedTuples.size());
+    for (Object tuple : sink.collectedTuples) {
+      TestPOJOEvent pojoEvent = (TestPOJOEvent)tuple;
+      Assert.assertTrue("i=" + i, pojoEvent.getId() == i);
+      Assert.assertTrue("date", pojoEvent.getStartDate() instanceof Date);
+      Assert.assertTrue("time", pojoEvent.getStartTime() instanceof Time);
+      Assert.assertTrue("timestamp", pojoEvent.getStartTimestamp() instanceof Timestamp);
+      i++;
+    }
   }
   
 
-  private void cleanTableAndInsertEvents(int numEvents)
+  private void insertEvents(int numEvents, boolean cleanExistingRows, int startRowId)
   {
     try (Connection con = DriverManager.getConnection(URL); Statement stmt = con.createStatement())
{
-      String cleanTable = "delete from " + TABLE_POJO_NAME;
-      stmt.executeUpdate(cleanTable);
+      if (cleanExistingRows) {
+        String cleanTable = "delete from " + TABLE_POJO_NAME;
+        stmt.executeUpdate(cleanTable);
+      }
 
       String insert = "insert into " + TABLE_POJO_NAME + " values (?,?,?,?,?)";
       PreparedStatement pStmt = con.prepareStatement(insert);
       con.prepareStatement(insert);
 
       for (int i = 0; i < numEvents; i++) {
-        pStmt.setInt(1, i);
+        pStmt.setInt(1, startRowId + i);
         pStmt.setString(2, "name");
         pStmt.setDate(3, new Date(2016, 1, 1));
         pStmt.setTime(4, new Time(2016, 1, 1));


Mime
View raw message