geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aging...@apache.org
Subject [geode] branch feature/GEODE-3781 updated: Added prepared statement cache Debug logging added that should be cleaned up later
Date Fri, 27 Oct 2017 18:14:52 GMT
This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push:
     new fdbfb14  Added prepared statement cache Debug logging added that should be cleaned
up later
fdbfb14 is described below

commit fdbfb14bf96515d3d30cb8fc5725e57fff7d0601
Author: Anil <agingade@pivotal.io>
AuthorDate: Fri Oct 27 11:13:50 2017 -0700

    Added prepared statement cache
    Debug logging added that should be cleaned up later
---
 .../apache/geode/connectors/jdbc/ColumnValue.java  |   5 +
 .../geode/connectors/jdbc/JDBCAsyncWriter.java     |   1 +
 .../apache/geode/connectors/jdbc/JDBCManager.java  | 120 ++++++++++++++-------
 3 files changed, 86 insertions(+), 40 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
index de422d5..fa540c2 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/ColumnValue.java
@@ -36,4 +36,9 @@ public class ColumnValue {
   public Object getValue() {
     return this.value;
   }
+
+  @Override
+  public String toString() {
+    return "ColumnValue [isKey=" + isKey + ", columnName=" + columnName + ", value=" + value
+ "]";
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
index 804301b..c394d12 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -56,6 +56,7 @@ public class JDBCAsyncWriter implements AsyncEventListener {
         // In that case need to serialize and deserialize.
         try {
           PdxInstance value = (PdxInstance) event.getDeserializedValue();
+          logger.info("AsyncEventListener event : " + event);
           this.manager.write(event.getRegion(), event.getOperation(), event.getKey(), value);
           successfulEvents += 1;
         } catch (RuntimeException ex) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
index 0ec643b..7c7ecea 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class JDBCManager {
 
@@ -45,8 +46,12 @@ public class JDBCManager {
 
   public void write(Region region, Operation operation, Object key, PdxInstance value) {
     String tableName = getTableName(region);
+    int pdxTypeId = 0;
+    if (value != null) {
+      pdxTypeId = ((PdxInstanceImpl) value).getPdxType().getTypeId();
+    }
     List<ColumnValue> columnList = getColumnToValueList(tableName, key, value, operation);
-    int updateCount = executeWrite(columnList, tableName, operation, false);
+    int updateCount = executeWrite(columnList, tableName, operation, pdxTypeId, false);
     if (operation.isDestroy()) {
       return;
     }
@@ -57,7 +62,7 @@ public class JDBCManager {
       } else {
         upsertOp = Operation.UPDATE;
       }
-      updateCount = executeWrite(columnList, tableName, upsertOp, true);
+      updateCount = executeWrite(columnList, tableName, upsertOp, pdxTypeId, true);
     }
     if (updateCount != 1) {
       throw new IllegalStateException("Unexpected updateCount " + updateCount);
@@ -65,23 +70,25 @@ public class JDBCManager {
   }
 
   private int executeWrite(List<ColumnValue> columnList, String tableName, Operation
operation,
-      boolean handleException) {
-    PreparedStatement pstmt = getQueryStatement(columnList, tableName, operation);
-    try {
-      int idx = 0;
-      for (ColumnValue cv : columnList) {
-        idx++;
-        pstmt.setObject(idx, cv.getValue());
-      }
-      pstmt.execute();
-      return pstmt.getUpdateCount();
-    } catch (SQLException e) {
-      if (handleException || operation.isDestroy()) {
-        handleSQLException(e);
+      int pdxTypeId, boolean handleException) {
+    PreparedStatement pstmt = getPreparedStatement(columnList, tableName, operation, pdxTypeId);
+    synchronized (pstmt) {
+      try {
+        int idx = 0;
+        for (ColumnValue cv : columnList) {
+          idx++;
+          pstmt.setObject(idx, cv.getValue());
+        }
+        pstmt.execute();
+        return pstmt.getUpdateCount();
+      } catch (SQLException e) {
+        if (handleException || operation.isDestroy()) {
+          handleSQLException(e);
+        }
+        return 0;
+      } finally {
+        clearStatement(pstmt);
       }
-      return 0;
-    } finally {
-      clearStatement(pstmt);
     }
   }
 
@@ -179,30 +186,63 @@ public class JDBCManager {
     return result;
   }
 
-  // private final ConcurrentMap<String, PreparedStatement> preparedStatementCache
= new
-  // ConcurrentHashMap<>();
+  private final ConcurrentMap<StatementKey, PreparedStatement> preparedStatementCache
=
+      new ConcurrentHashMap<>();
 
-  private PreparedStatement getQueryStatement(List<ColumnValue> columnList, String
tableName,
-      Operation operation) {
-    // ConcurrentMap<String, PreparedStatement> cache = getPreparedStatementCache(operation);
-    // return cache.computeIfAbsent(query, k -> {
-    // String query = getQueryString(tableName, columnList, operation);
-    // Connection con = getConnection();
-    // try {
-    // return con.prepareStatement(k);
-    // } catch (SQLException e) {
-    // handleSQLException(e);
-    // }
-    // });
-    String query = getQueryString(tableName, columnList, operation);
-    System.out.println("query=" + query);
-    Connection con = getConnection();
-    try {
-      return con.prepareStatement(query);
-    } catch (SQLException e) {
-      handleSQLException(e);
-      return null; // this line is never reached
+  private static class StatementKey {
+    private final int pdxTypeId;
+    private final Operation operation;
+    private final String tableName;
+
+    public StatementKey(int pdxTypeId, Operation operation, String tableName) {
+      this.pdxTypeId = pdxTypeId;
+      this.operation = operation;
+      this.tableName = tableName;
     }
+
+    @Override
+    public int hashCode() {
+      return operation.hashCode() + pdxTypeId + tableName.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      StatementKey other = (StatementKey) obj;
+      if (!operation.equals(other.operation)) {
+        return false;
+      }
+      if (pdxTypeId != other.pdxTypeId) {
+        return false;
+      }
+      if (!tableName.equals(other.tableName)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private PreparedStatement getPreparedStatement(List<ColumnValue> columnList, String
tableName,
+      Operation operation, int pdxTypeId) {
+    System.out.println("getPreparedStatement : " + pdxTypeId + "operation: " + operation
+        + " columns: " + columnList);
+    StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
+    return preparedStatementCache.computeIfAbsent(key, k -> {
+      String query = getQueryString(tableName, columnList, operation);
+      System.out.println("query=" + query);
+      Connection con = getConnection();
+      try {
+        return con.prepareStatement(query);
+      } catch (SQLException e) {
+        handleSQLException(e);
+        return null; // this line is never reached
+      }
+    });
   }
 
   private List<ColumnValue> getColumnToValueList(String tableName, Object key, PdxInstance
value,

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message