cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject svn commit: r1346840 - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
Date Wed, 06 Jun 2012 12:01:03 GMT
Author: ay
Date: Wed Jun  6 12:01:02 2012
New Revision: 1346840

URL: http://svn.apache.org/viewvc?rev=1346840&view=rev
Log:
[CXF-4362] Add a reconnect option in WS-RM RMTxStore

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties?rev=1346840&r1=1346839&r2=1346840&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
Wed Jun  6 12:01:02 2012
@@ -20,6 +20,8 @@
 #
 CONNECT_EXC = Failed to connect to store.
 ABORT_FAILED_MSG = Failed to abort transaction.
+CLOSE_FAILED_MSG = Failed to close connection.
+RECONNECT_WAIT_MSG = Waiting for the next reconnect attempt.
 SELECT_DEST_SEQ_FAILED_MSG = Failed to retrieve destination sequences from persistent store.
 SELECT_SRC_SEQ_FAILED_MSG = Failed to retrieve source sequences from persistent store.
 VERIFY_TABLE_FAILED_MSG = Failed to verify the table definition.
\ No newline at end of file

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1346840&r1=1346839&r2=1346840&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Wed Jun  6 12:01:02 2012
@@ -178,6 +178,14 @@ public class RMTxStore implements RMStor
     private String userName;
     private String password;
     private String schemaName;
+
+    private long initialReconnectDelay = 60000L;
+    private int useExponentialBackOff = 2;
+    private int maxReconnectAttempts = 10;
+
+    private long reconnectDelay;
+    private int reconnectAttempts;
+    private long nextReconnectAttempt;
     
     private String tableExistsState = DERBY_TABLE_EXISTS_STATE;
     private int tableExistsCode = ORACLE_TABLE_EXISTS_CODE;
@@ -192,6 +200,7 @@ public class RMTxStore implements RMStor
             } catch (SQLException e) {
                 //ignore
             }
+            connection = null;
         }
     }
     
@@ -265,6 +274,22 @@ public class RMTxStore implements RMStor
         this.tableExistsCode = tableExistsCode;
     }
 
+    public long getInitialReconnectDelay() {
+        return initialReconnectDelay;
+    }
+
+    public void setInitialReconnectDelay(long initialReconnectDelay) {
+        this.initialReconnectDelay = initialReconnectDelay;
+    }
+
+    public int getMaxReconnectAttempts() {
+        return maxReconnectAttempts;
+    }
+
+    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+        this.maxReconnectAttempts = maxReconnectAttempts;
+    }
+
     public void setConnection(Connection c) {
         connection = c;
         createdConnection = false; 
@@ -280,6 +305,8 @@ public class RMTxStore implements RMStor
             LOG.info("Creating destination sequence: " + sequenceIdentifier + ", (endpoint:
"
                  + endpointIdentifier + ")");
         }
+        verifyConnection();
+        SQLException conex = null;
         try {
             beginTransaction();
             
@@ -291,10 +318,12 @@ public class RMTxStore implements RMStor
             createDestSequenceStmt.execute();
             
             commit();
-            
         } catch (SQLException ex) {
             abort();
+            conex = ex;
             throw new RMStoreException(ex);
+        } finally {
+            updateConnectionState(conex);
         }
     }
     
@@ -306,7 +335,8 @@ public class RMTxStore implements RMStor
             LOG.fine("Creating source sequence: " + sequenceIdentifier + ", (endpoint: "
                      + endpointIdentifier + ")"); 
         }
-        
+        verifyConnection();
+        SQLException conex = null;
         try {
             beginTransaction();
             
@@ -320,10 +350,12 @@ public class RMTxStore implements RMStor
             createSrcSequenceStmt.execute();    
             
             commit();
-            
         } catch (SQLException ex) {
+            conex = ex;
             abort();
             throw new RMStoreException(ex);
+        } finally {
+            updateConnectionState(conex);
         }
     }
 
@@ -331,6 +363,8 @@ public class RMTxStore implements RMStor
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting destination sequence for id: " + sid);
         }
+        verifyConnection();
+        SQLException conex = null;
         ResultSet res = null;
         try {
             synchronized (selectDestSequenceStmt) {
@@ -351,6 +385,7 @@ public class RMTxStore implements RMStor
                 }
             }
         } catch (SQLException ex) {
+            conex = ex;
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
         } finally {
             if (res != null) {
@@ -360,6 +395,7 @@ public class RMTxStore implements RMStor
                     // ignore
                 }
             }
+            updateConnectionState(conex);
         }
         return null;
     }
@@ -368,6 +404,8 @@ public class RMTxStore implements RMStor
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting source sequences for id: " + sid);
         }
+        verifyConnection();
+        SQLException conex = null;
         ResultSet res = null;
         try {
             synchronized (selectSrcSequenceStmt) {
@@ -390,6 +428,7 @@ public class RMTxStore implements RMStor
                 }
             }
         } catch (SQLException ex) {
+            conex = ex;
             // ignore
             LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(),
ex);
         } finally {
@@ -400,11 +439,14 @@ public class RMTxStore implements RMStor
                     // ignore
                 }
             }
+            updateConnectionState(conex);
         } 
         return null;
     }
 
     public void removeDestinationSequence(Identifier sid) {
+        verifyConnection();
+        SQLException conex = null;
         try {
             beginTransaction();
             
@@ -414,13 +456,18 @@ public class RMTxStore implements RMStor
             commit();
             
         } catch (SQLException ex) {
+            conex = ex;
             abort();
             throw new RMStoreException(ex);
-        }        
+        } finally {
+            updateConnectionState(conex);
+        }
     }
     
     
     public void removeSourceSequence(Identifier sid) {
+        verifyConnection();
+        SQLException conex = null;
         try {
             beginTransaction();
             
@@ -430,8 +477,11 @@ public class RMTxStore implements RMStor
             commit();
             
         } catch (SQLException ex) {
+            conex = ex;
             abort();
             throw new RMStoreException(ex);
+        } finally {
+            updateConnectionState(conex);
         }        
     }
     
@@ -439,6 +489,8 @@ public class RMTxStore implements RMStor
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting destination sequences for endpoint: " + endpointIdentifier);
         }
+        verifyConnection();
+        SQLException conex = null;
         Collection<DestinationSequence> seqs = new ArrayList<DestinationSequence>();
         ResultSet res = null;
         try {
@@ -462,6 +514,7 @@ public class RMTxStore implements RMStor
                 }
             }
         } catch (SQLException ex) {
+            conex = ex;
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
         } finally {
             if (res != null) {
@@ -471,6 +524,7 @@ public class RMTxStore implements RMStor
                     // ignore
                 }
             }
+            updateConnectionState(conex);
         } 
         return seqs;
     }
@@ -479,6 +533,8 @@ public class RMTxStore implements RMStor
         if (LOG.isLoggable(Level.FINE)) {
             LOG.info("Getting source sequences for endpoint: " + endpointIdentifier);
         }
+        verifyConnection();
+        SQLException conex = null;
         Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
         ResultSet res = null;
         try {
@@ -504,6 +560,7 @@ public class RMTxStore implements RMStor
                 }
             }
         } catch (SQLException ex) {
+            conex = ex;
             // ignore
             LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(),
ex);
         } finally {
@@ -514,11 +571,14 @@ public class RMTxStore implements RMStor
                     // ignore
                 }
             }
+            updateConnectionState(conex);
         } 
         return seqs;
     }
     
     public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) {
+        verifyConnection();
+        SQLException conex = null;
         Collection<RMMessage> msgs = new ArrayList<RMMessage>();
         ResultSet res = null;
         try {
@@ -537,7 +597,11 @@ public class RMTxStore implements RMStor
                     msgs.add(msg);
                 }
             }
-        } catch (Exception ex) {
+        } catch (SQLException ex) {
+            conex = ex;
+            LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
+                : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
+        } catch (IOException ex) {
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
         } finally {
@@ -548,11 +612,14 @@ public class RMTxStore implements RMStor
                     // ignore
                 }
             }
+            updateConnectionState(conex);
         }
         return msgs;
     }
     
     public void persistIncoming(DestinationSequence seq, RMMessage msg) {        
+        verifyConnection();
+        SQLException conex = null;
         try {
             beginTransaction();
             
@@ -565,14 +632,19 @@ public class RMTxStore implements RMStor
             commit();
             
         } catch (SQLException ex) {
+            conex = ex;
             abort();
             throw new RMStoreException(ex);
         } catch (IOException ex) {
             abort();
             throw new RMStoreException(ex);        
-        }        
+        } finally {
+            updateConnectionState(conex);
+        }
     }
     public void persistOutgoing(SourceSequence seq, RMMessage msg) {
+        verifyConnection();
+        SQLException conex = null;
         try {
             beginTransaction();
             
@@ -585,15 +657,20 @@ public class RMTxStore implements RMStor
             commit();
             
         } catch (SQLException ex) {
+            conex = ex;
             abort();
             throw new RMStoreException(ex);
         } catch (IOException ex) {
             abort();
             throw new RMStoreException(ex);        
+        } finally {
+            updateConnectionState(conex);
         }        
     }
     
     public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean
outbound) {
+        verifyConnection();
+        SQLException conex = null;
         try {
             PreparedStatement stmt = outbound ? deleteOutboundMessageStmt : deleteInboundMessageStmt;
             beginTransaction();
@@ -608,9 +685,12 @@ public class RMTxStore implements RMStor
             commit();
             
         } catch (SQLException ex) {
+            conex = ex;
             abort();
             throw new RMStoreException(ex);
-        }        
+        } finally {
+            updateConnectionState(conex);
+        }
     }
     
     // transaction demarcation
@@ -686,6 +766,7 @@ public class RMTxStore implements RMStor
             updateDestSequenceStmt.execute();
         }
     }
+
     protected void createTables() throws SQLException {
         
         Statement stmt = null;
@@ -902,7 +983,42 @@ public class RMTxStore implements RMStor
     Connection getConnection() {
         return connection;
     }
-    
+
+    private void verifyConnection() {
+        if (createdConnection && nextReconnectAttempt > 0
+            && (maxReconnectAttempts < 0 || maxReconnectAttempts > reconnectAttempts))
{
+            if (System.currentTimeMillis() > nextReconnectAttempt) {
+                // destroy the broken connection
+                destroy();
+                // try to reconnect
+                reconnectAttempts++;
+                init();
+                // reset the next reconnect attempt time
+                nextReconnectAttempt = 0;
+            } else {
+                LogUtils.log(LOG, Level.INFO, "WAIT_RECONNECT_MSG");
+            }
+        }
+    }
+
+    private synchronized void updateConnectionState(SQLException e) {
+        if (e == null) {
+            // reset the previous error status
+            reconnectDelay = 0;
+            reconnectAttempts = 0;
+            nextReconnectAttempt = 0;
+        } else if (createdConnection && isRecoverableError(e)) {
+            // update the next reconnect schedule 
+            if (reconnectDelay == 0) {
+                reconnectDelay = initialReconnectDelay;
+            }
+            if (nextReconnectAttempt < System.currentTimeMillis()) {
+                nextReconnectAttempt = System.currentTimeMillis() + reconnectDelay;
+                reconnectDelay = reconnectDelay * useExponentialBackOff;
+            }
+        }
+    }
+
     public static void deleteDatabaseFiles() {
         deleteDatabaseFiles(DEFAULT_DATABASE_NAME, true);
     }
@@ -990,4 +1106,9 @@ public class RMTxStore implements RMStor
         return (null != tableExistsState && tableExistsState.equals(ex.getSQLState()))
                 || tableExistsCode == ex.getErrorCode();
     }
+    
+    protected boolean isRecoverableError(SQLException ex) {
+        // check for a transient or non-transient connection exception
+        return ex.getSQLState() != null && ex.getSQLState().startsWith("08");
+    }
 }
\ No newline at end of file

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java?rev=1346840&r1=1346839&r2=1346840&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
(original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTest.java
Wed Jun  6 12:01:02 2012
@@ -674,6 +674,58 @@ public class RMTxStoreTest extends Asser
             store.removeMessages(sid1, msgNrs, false);
         }
     }
+
+    @Test
+    public void testReconnect() throws Exception {
+        // set the initial reconnect delay to 100 msec for testing
+        long ird = store.getInitialReconnectDelay();
+        store.setInitialReconnectDelay(100);
+        
+        SourceSequence seq = control.createMock(SourceSequence.class);
+        Identifier sid1 = RMUtils.getWSRMFactory().createIdentifier();
+        sid1.setValue("sequence1");
+        EasyMock.expect(seq.getIdentifier()).andReturn(sid1);
+        EasyMock.expect(seq.getExpires()).andReturn(null);
+        EasyMock.expect(seq.getOfferingSequenceIdentifier()).andReturn(null);
+        EasyMock.expect(seq.getEndpointIdentifier()).andReturn(CLIENT_ENDPOINT_ID);
+        EasyMock.expect(seq.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408);
+        
+        // intentionally invalidate the connection
+        try {
+            store.getConnection().close();
+        } catch (SQLException ex) {
+            // ignore
+        }
+        
+        control.replay();
+        try {
+            store.createSourceSequence(seq);  
+            fail("Expected RMStoreException was not thrown.");
+        } catch (RMStoreException ex) {
+            SQLException se = (SQLException)ex.getCause();
+            // expects a transient or non-transient connection exception
+            assertTrue(se.getSQLState().startsWith("08"));
+        }
+        
+        // wait 200 msecs to make sure an reconnect is attempted
+        Thread.sleep(200);
+        
+        control.reset();
+        EasyMock.expect(seq.getIdentifier()).andReturn(sid1);
+        EasyMock.expect(seq.getExpires()).andReturn(null);
+        EasyMock.expect(seq.getOfferingSequenceIdentifier()).andReturn(null);
+        EasyMock.expect(seq.getEndpointIdentifier()).andReturn(CLIENT_ENDPOINT_ID);
+        EasyMock.expect(seq.getProtocol()).andReturn(ProtocolVariation.RM10WSA200408);
+        
+        control.replay();
+        store.createSourceSequence(seq);
+        control.verify();
+        
+        // revert to the old initial reconnect delay
+        store.setInitialReconnectDelay(ird);
+        
+        store.removeSourceSequence(sid1);
+    }
     
     private Identifier setupDestinationSequence(String s) throws IOException, SQLException
{
         DestinationSequence seq = control.createMock(DestinationSequence.class);



Mime
View raw message