geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aging...@apache.org
Subject [geode] branch feature/GEODE-3781 updated: Added support for upsert operation.
Date Thu, 26 Oct 2017 23:25:26 GMT
This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push:
     new c80d024  Added support for upsert operation.
c80d024 is described below

commit c80d024816046e3929bc316d6e007f607db36f07
Author: Anil <agingade@pivotal.io>
AuthorDate: Thu Oct 26 16:24:55 2017 -0700

    Added support for upsert operation.
---
 .../apache/geode/connectors/jdbc/JDBCManager.java  | 26 +++++++++++-
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       | 49 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
index 86980e0..1b98da6 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -69,6 +69,26 @@ public class JDBCManager {
   public void write(Region region, Operation operation, Object key, PdxInstance value) {
     String tableName = getTableName(region);
     List<ColumnValue> columnList = getColumnToValueList(tableName, key, value, operation);
+    int updateCount = executeWrite(columnList, tableName, operation, false);
+    if (operation.isDestroy()) {
+      return;
+    }
+    if (updateCount <= 0) {
+      Operation upsertOp;
+      if (operation.isUpdate()) {
+        upsertOp = Operation.CREATE;
+      } else {
+        upsertOp = Operation.UPDATE;
+      }
+      updateCount = executeWrite(columnList, tableName, upsertOp, true);
+    }
+    if (updateCount != 1) {
+      throw new IllegalStateException("Unexpected updateCount " + updateCount);
+    }
+  }
+
+  private int executeWrite(List<ColumnValue> columnList, String tableName, Operation
operation,
+      boolean handleException) {
     PreparedStatement pstmt = getQueryStatement(columnList, tableName, operation);
     try {
       int idx = 0;
@@ -77,8 +97,12 @@ public class JDBCManager {
         pstmt.setObject(idx, cv.getValue());
       }
       pstmt.execute();
+      return pstmt.getUpdateCount();
     } catch (SQLException e) {
-      handleSQLException(e);
+      if (handleException || operation.isDestroy()) {
+        handleSQLException(e);
+      }
+      return 0;
     } finally {
       clearStatement(pstmt);
     }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 42999a3..87b654b 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -231,6 +231,55 @@ public class JDBCAsyncWriterIntegrationTest {
     assertThat(rs.next()).isFalse();
   }
 
+  @Test
+  public void canUpdateBecomeInsert() throws Exception {
+    Region employees = createRegionWithJDBCAsyncWriter(regionTableName, getRequiredProperties());
+    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    employees.put("1", pdx1);
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    stmt.execute("delete from " + regionTableName + " where id = '1'");
+    validateTableRowCount(0);
+
+    PdxInstance pdx3 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 72).create();
+    employees.put("1", pdx3);
+
+    Awaitility.await().atMost(10, TimeUnit.SECONDS)
+        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet rs = stmt.executeQuery("select * from " + regionTableName + " order by id asc");
+    assertThat(rs.next()).isTrue();
+    assertThat(rs.getString("id")).isEqualTo("1");
+    assertThat(rs.getString("name")).isEqualTo("Emp1");
+    assertThat(rs.getObject("age")).isEqualTo(72);
+    assertThat(rs.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertBecomeUpdate() throws Exception {
+    stmt.execute("Insert into " + regionTableName + " values('1', 'bogus', 11)");
+    validateTableRowCount(1);
+
+    Region employees = createRegionWithJDBCAsyncWriter(regionTableName, getRequiredProperties());
+    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    employees.put("1", pdx1);
+
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet rs = stmt.executeQuery("select * from " + regionTableName + " order by id asc");
+    assertThat(rs.next()).isTrue();
+    assertThat(rs.getString("id")).isEqualTo("1");
+    assertThat(rs.getString("name")).isEqualTo("Emp1");
+    assertThat(rs.getObject("age")).isEqualTo(55);
+    assertThat(rs.next()).isFalse();
+  }
+
   private Region createRegionWithJDBCAsyncWriter(String regionName, Properties props) {
     jdbcWriter = new JDBCAsyncWriter();
     jdbcWriter.init(props);

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message