openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fa...@apache.org
Subject svn commit: r654942 - /openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
Date Fri, 09 May 2008 21:34:29 GMT
Author: fancy
Date: Fri May  9 14:34:29 2008
New Revision: 654942

URL: http://svn.apache.org/viewvc?rev=654942&view=rev
Log:
OPENJPA-598 Make BatchingPreparedStatementManagerImpl more flexible and extensible, Sub-task
of OPENJPA-477
Committing patch provided by Fay Wang

Modified:
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java?rev=654942&r1=654941&r2=654942&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
Fri May  9 14:34:29 2008
@@ -83,28 +83,32 @@
         } else {
             // process the SQL statement, either execute it immediately or
             // batch it for later execution.
-            String sql = row.getSQL(_dict);
-            if (_batchedSql == null) {
-                // brand new SQL
-                _batchedSql = sql;
-            } else if (!sql.equals(_batchedSql)) {
-                // SQL statements changed.
-                switch (_batchedRows.size()) {
-                case 0:
-                    break;
-                case 1:
-                    // single entry in cache, direct SQL execution. 
-                    super.flushAndUpdate((RowImpl) _batchedRows.get(0));
-                    _batchedRows.clear();
-                    break;
-                default:
-                    // flush all entries in cache in batch.
-                    flushBatch();
-                }
-                _batchedSql = sql;
+            batchOrExecuteRow(row);
+        }
+    }
+
+    protected void batchOrExecuteRow(RowImpl row) throws SQLException {
+        String sql = row.getSQL(_dict);
+        if (_batchedSql == null) {
+            // brand new SQL
+            _batchedSql = sql;
+        } else if (!sql.equals(_batchedSql)) {
+            // SQL statements changed.
+            switch (_batchedRows.size()) {
+            case 0:
+                break;
+            case 1:
+                // single entry in cache, direct SQL execution. 
+                super.flushAndUpdate((RowImpl) _batchedRows.get(0));
+                _batchedRows.clear();
+                break;
+            default:
+                // flush all entries in cache in batch.
+                flushBatch();
             }
-            _batchedRows.add(row);
+            _batchedSql = sql;
         }
+        _batchedRows.add(row);
     }
 
     /*
@@ -113,8 +117,7 @@
      */
     private boolean isBatchDisabled(RowImpl row) {
         boolean rtnVal = true;
-        if (_batchLimit != 0 && !_disableBatch) {
-            String sql = row.getSQL(_dict);
+        if (getBatchLimit() != 0 && !isBatchDisabled()) {
             OpenJPAStateManager sm = row.getPrimaryKey();
             ClassMapping cmd = null;
             if (sm != null)
@@ -123,9 +126,9 @@
             if (row.getAction() == Row.ACTION_INSERT)
                 autoAssign = row.getTable().getAutoAssignedColumns();
             // validate batch capability
-            _disableBatch = _dict
+            rtnVal = _dict
                 .validateBatchProcess(row, autoAssign, sm, cmd);
-            rtnVal = _disableBatch;
+            setBatchDisabled(rtnVal);
         }
         return rtnVal;
     }
@@ -135,45 +138,53 @@
      * prepared statements.
      */
     protected void flushBatch() {
-        if (_batchedSql != null && _batchedRows.size() > 0) {
+        List batchedRows = getBatchedRows();
+        String batchedSql = getBatchedSql();
+        if (batchedRows == null)
+            return;
+
+        int batchSize = batchedRows.size();
+        if (batchedSql != null &&  batchSize > 0) {
             PreparedStatement ps = null;
             try {
                 RowImpl onerow = null;
-                ps = _conn.prepareStatement(_batchedSql);
-                if (_batchedRows.size() == 1) {
+                ps = prepareStatement(batchedSql);
+                if (batchSize == 1) {
                     // execute a single row.
-                    onerow = (RowImpl) _batchedRows.get(0);
+                    onerow = (RowImpl) batchedRows.get(0);
                     flushSingleRow(onerow, ps);
                 } else {
                     // cache has more than one rows, execute as batch.
                     int count = 0;
                     int batchedRowsBaseIndex = 0;
-                    Iterator itr = _batchedRows.iterator();
+                    Iterator itr = batchedRows.iterator();
                     while (itr.hasNext()) {
                         onerow = (RowImpl) itr.next();
                         if (_batchLimit == 1) {
                             flushSingleRow(onerow, ps);
                         } else {
                             if (count < _batchLimit || _batchLimit == -1) {
-                                onerow.flush(ps, _dict, _store);
-                                ps.addBatch();
+                                if (ps != null)
+                                    onerow.flush(ps, _dict, _store);
+                                addBatch(ps, onerow, count);
                                 count++;
                             } else {
                                 // reach the batchLimit, execute the batch
-                                int[] rtn = ps.executeBatch();
+                                int[] rtn = executeBatch(ps);
                                 checkUpdateCount(rtn, batchedRowsBaseIndex);
 
                                 batchedRowsBaseIndex += _batchLimit;
 
-                                onerow.flush(ps, _dict, _store);
-                                ps.addBatch();
+                                if (ps != null)
+                                    onerow.flush(ps, _dict, _store);
+                                addBatch(ps, onerow, count);
                                 // reset the count to 1 for new batch
                                 count = 1;
                             }
                         }
                     }
                     // end of the loop, execute the batch
-                    int[] rtn = ps.executeBatch();
+                    int[] rtn = executeBatch(ps);
                     checkUpdateCount(rtn, batchedRowsBaseIndex);
                 }
             } catch (SQLException se) {
@@ -183,7 +194,7 @@
                 throw SQLExceptions.getStore(sqex, ps, _dict);
             } finally {
                 _batchedSql = null;
-                _batchedRows.clear();
+                batchedRows.clear();
                 if (ps != null) {
                     try {
                         ps.close();
@@ -200,8 +211,9 @@
      */
     private void flushSingleRow(RowImpl row, PreparedStatement ps)
         throws SQLException {
-        row.flush(ps, _dict, _store);
-        int count = ps.executeUpdate();
+        if (ps != null)
+            row.flush(ps, _dict, _store);
+        int count = executeUpdate(ps, row.getSQL(_dict), row);
         if (count != 1) {
             Object failed = row.getFailedObject();
             if (failed != null)
@@ -219,9 +231,10 @@
         throws SQLException {
         int cnt = 0;
         Object failed = null;
+        List batchedRows = getBatchedRows();
         for (int i = 0; i < count.length; i++) {
             cnt = count[i];
-            RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i);
+            RowImpl row = (RowImpl) batchedRows.get(batchedRowsBaseIndex + i);
             failed = row.getFailedObject();
             switch (cnt) {
             case Statement.EXECUTE_FAILED: // -3
@@ -230,14 +243,16 @@
                 else if (row.getAction() == Row.ACTION_INSERT)
                     throw new SQLException(_loc.get(
                         "update-failed-no-failed-obj",
-                        String.valueOf(count[i]), _batchedSql).getMessage());
+                        String.valueOf(count[i]), 
+                        row.getSQL(_dict)).getMessage());
                 break;
             case Statement.SUCCESS_NO_INFO: // -2
                 if (failed != null || row.getAction() == Row.ACTION_UPDATE)
                     _exceptions.add(new OptimisticException(failed));
                 else if (_log.isTraceEnabled())
                     _log.trace(_loc.get("batch_update_info",
-                        String.valueOf(cnt), _batchedSql).getMessage());
+                        String.valueOf(cnt), 
+                        row.getSQL(_dict)).getMessage());
                 break;
             case 0: // no row is inserted, treats it as failed
                 // case
@@ -246,8 +261,43 @@
                 else if (row.getAction() == Row.ACTION_INSERT)
                     throw new SQLException(_loc.get(
                         "update-failed-no-failed-obj",
-                        String.valueOf(count[i]), _batchedSql).getMessage());
+                        String.valueOf(count[i]), 
+                        row.getSQL(_dict)).getMessage());
             }
         }
     }
+
+    public boolean isBatchDisabled() {
+        return _disableBatch;
+    }
+
+    public void setBatchDisabled(boolean disableBatch) {
+        _disableBatch = disableBatch;
+    }
+
+    public int getBatchLimit() {
+        return _batchLimit;
+    }
+
+    public void setBatchLimit(int batchLimit) {
+        _batchLimit = batchLimit;
+    }
+
+    public List getBatchedRows() {
+        return _batchedRows;
+    }
+
+    public String getBatchedSql() {
+        return _batchedSql;
+    }
+
+    protected void addBatch(PreparedStatement ps, RowImpl row, 
+            int count) throws SQLException {
+        ps.addBatch();
+    }
+
+    protected int[] executeBatch(PreparedStatement ps) 
+    throws SQLException {
+        return ps.executeBatch();
+    }
 }



Mime
View raw message