cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject svn commit: r1332129 - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Date Mon, 30 Apr 2012 09:12:16 GMT
Author: ay
Date: Mon Apr 30 09:12:15 2012
New Revision: 1332129

URL: http://svn.apache.org/viewvc?rev=1332129&view=rev
Log:
[CXF-4276] Make WS-RM's default store RMTxStore to be shareable

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1332129&r1=1332128&r2=1332129&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Mon Apr 30 09:12:15 2012
@@ -263,9 +263,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == createDestSequenceStmt) {
-                createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
-            }
             createDestSequenceStmt.setString(1, sequenceIdentifier);
             String addr = seq.getAcksTo().getAddress().getValue();
             createDestSequenceStmt.setString(2, addr);
@@ -293,10 +290,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == createSrcSequenceStmt) {
-                createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
-            }
-            assert null != createSrcSequenceStmt;
             createSrcSequenceStmt.setString(1, sequenceIdentifier);
             Date expiry = seq.getExpires();
             createSrcSequenceStmt.setLong(2, expiry == null ? 0 : expiry.getTime());
@@ -319,23 +312,22 @@ public class RMTxStore implements RMStor
             LOG.info("Getting destination sequence for id: " + sid);
         }
         try {
-            if (null == selectDestSequenceStmt) {
-                selectDestSequenceStmt = 
-                    connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);         
     
-            }
-            selectDestSequenceStmt.setString(1, sid.getValue());
-            ResultSet res = selectDestSequenceStmt.executeQuery(); 
-            if (res.next()) {
-                EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
 
-                long lm = res.getLong(2);
-                ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
-                InputStream is = res.getBinaryStream(4);
-                SequenceAcknowledgement ack = null;
-                if (null != is) {
-                    ack = PersistenceUtils.getInstance()
-                        .deserialiseAcknowledgment(is); 
+            synchronized (selectDestSequenceStmt) {
+                selectDestSequenceStmt.setString(1, sid.getValue());
+                ResultSet res = selectDestSequenceStmt.executeQuery();
+            
+                if (res.next()) {
+                    EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
 
+                    long lm = res.getLong(2);
+                    ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
+                    InputStream is = res.getBinaryStream(4);
+                    SequenceAcknowledgement ack = null;
+                    if (null != is) {
+                        ack = PersistenceUtils.getInstance()
+                            .deserialiseAcknowledgment(is); 
+                    }
+                    return new DestinationSequence(sid, acksTo, lm, ack, pv);
                 }
-                return new DestinationSequence(sid, acksTo, lm, ack, pv);
             }
         } catch (SQLException ex) {
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
@@ -348,27 +340,24 @@ public class RMTxStore implements RMStor
             LOG.info("Getting source sequences for id: " + sid);
         }
         try {
-            if (null == selectSrcSequenceStmt) {
-                selectSrcSequenceStmt = 
-                    connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);     
-            }
-            selectSrcSequenceStmt.setString(1, sid.getValue());
-            ResultSet res = selectSrcSequenceStmt.executeQuery();
-            
-            if (res.next()) {
-                long cmn = res.getLong(1);
-                boolean lm = res.getBoolean(2);
-                long lval = res.getLong(3);
-                Date expiry = 0 == lval ? null : new Date(lval);
-                String oidValue = res.getString(4);
-                Identifier oi = null;
-                if (null != oidValue) {
-                    oi = RMUtils.getWSRMFactory().createIdentifier();
-                    oi.setValue(oidValue);
-                }
-                ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
-                return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
-                          
+            synchronized (selectSrcSequenceStmt) {
+                selectSrcSequenceStmt.setString(1, sid.getValue());
+                ResultSet res = selectSrcSequenceStmt.executeQuery();
+            
+                if (res.next()) {
+                    long cmn = res.getLong(1);
+                    boolean lm = res.getBoolean(2);
+                    long lval = res.getLong(3);
+                    Date expiry = 0 == lval ? null : new Date(lval);
+                    String oidValue = res.getString(4);
+                    Identifier oi = null;
+                    if (null != oidValue) {
+                        oi = RMUtils.getWSRMFactory().createIdentifier();
+                        oi.setValue(oidValue);
+                    }
+                    ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
+                    return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
+                }
             }
         } catch (SQLException ex) {
             // ignore
@@ -381,9 +370,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == deleteDestSequenceStmt) {
-                deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
-            }
             deleteDestSequenceStmt.setString(1, sid.getValue());
             deleteDestSequenceStmt.execute();
             
@@ -400,9 +386,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == deleteSrcSequenceStmt) {
-                deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
-            }
             deleteSrcSequenceStmt.setString(1, sid.getValue());
             deleteSrcSequenceStmt.execute();
             
@@ -420,27 +403,24 @@ public class RMTxStore implements RMStor
         }
         Collection<DestinationSequence> seqs = new ArrayList<DestinationSequence>();
         try {
-            if (null == selectDestSequencesStmt) {
-                selectDestSequencesStmt = 
-                    connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);        
      
-            }
-            selectDestSequencesStmt.setString(1, endpointIdentifier);
-            
-            ResultSet res = selectDestSequencesStmt.executeQuery(); 
-            while (res.next()) {
-                Identifier sid = new Identifier();                
-                sid.setValue(res.getString(1));
-                EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
 
-                long lm = res.getLong(3);
-                ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
-                InputStream is = res.getBinaryStream(5);
-                SequenceAcknowledgement ack = null;
-                if (null != is) {
-                    ack = PersistenceUtils.getInstance()
-                        .deserialiseAcknowledgment(is); 
+            synchronized (selectDestSequencesStmt) {
+                selectDestSequencesStmt.setString(1, endpointIdentifier);
+                ResultSet res = selectDestSequencesStmt.executeQuery(); 
+                while (res.next()) {
+                    Identifier sid = new Identifier();                
+                    sid.setValue(res.getString(1));
+                    EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
 
+                    long lm = res.getLong(3);
+                    ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
+                    InputStream is = res.getBinaryStream(5);
+                    SequenceAcknowledgement ack = null;
+                    if (null != is) {
+                        ack = PersistenceUtils.getInstance()
+                            .deserialiseAcknowledgment(is); 
+                    }
+                    DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack,
pv);
+                    seqs.add(seq);                                                 
                 }
-                DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv);
-                seqs.add(seq);                                                 
             }
         } catch (SQLException ex) {
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
@@ -454,29 +434,26 @@ public class RMTxStore implements RMStor
         }
         Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
         try {
-            if (null == selectSrcSequencesStmt) {
-                selectSrcSequencesStmt = 
-                    connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);     
-            }
-            selectSrcSequencesStmt.setString(1, endpointIdentifier);
-            ResultSet res = selectSrcSequencesStmt.executeQuery();
-            
-            while (res.next()) {
-                Identifier sid = new Identifier();
-                sid.setValue(res.getString(1));
-                long cmn = res.getLong(2);
-                boolean lm = res.getBoolean(3);
-                long lval = res.getLong(4);
-                Date expiry = 0 == lval ? null : new Date(lval);
-                String oidValue = res.getString(5);
-                Identifier oi = null;
-                if (null != oidValue) {
-                    oi = new Identifier();
-                    oi.setValue(oidValue);
-                }
-                ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
-                SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, pv);
-                seqs.add(seq);                          
+            synchronized (selectSrcSequencesStmt) {
+                selectSrcSequencesStmt.setString(1, endpointIdentifier);
+                ResultSet res = selectSrcSequencesStmt.executeQuery();
+                while (res.next()) {
+                    Identifier sid = new Identifier();
+                    sid.setValue(res.getString(1));
+                    long cmn = res.getLong(2);
+                    boolean lm = res.getBoolean(3);
+                    long lval = res.getLong(4);
+                    Date expiry = 0 == lval ? null : new Date(lval);
+                    String oidValue = res.getString(5);
+                    Identifier oi = null;
+                    if (null != oidValue) {
+                        oi = new Identifier();
+                        oi.setValue(oidValue);
+                    }
+                    ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
+                    SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, pv);
+                    seqs.add(seq);                          
+                }
             }
         } catch (SQLException ex) {
             // ignore
@@ -489,27 +466,20 @@ public class RMTxStore implements RMStor
         Collection<RMMessage> msgs = new ArrayList<RMMessage>();
         try {
             PreparedStatement stmt = outbound ? selectOutboundMessagesStmt : selectInboundMessagesStmt;
-            if (null == stmt) {
-                stmt = connection.prepareStatement(MessageFormat.format(SELECT_MESSAGES_STMT_STR,
-                    outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME));
-                if (outbound) {
-                    selectOutboundMessagesStmt = stmt;                    
-                } else {
-                    selectInboundMessagesStmt = stmt;
+            synchronized (stmt) {
+                stmt.setString(1, sid.getValue());
+                ResultSet res = stmt.executeQuery();
+                while (res.next()) {
+                    long mn = res.getLong(1);
+                    String to = res.getString(2);
+                    Blob blob = res.getBlob(3);
+                    RMMessage msg = new RMMessage();
+                    msg.setMessageNumber(mn);
+                    msg.setTo(to);
+                    msg.setContent(blob.getBinaryStream());
+                    msgs.add(msg);
                 }
             }
-            stmt.setString(1, sid.getValue());
-            ResultSet res = stmt.executeQuery();
-            while (res.next()) {
-                long mn = res.getLong(1);
-                String to = res.getString(2);
-                Blob blob = res.getBlob(3);
-                RMMessage msg = new RMMessage();
-                msg.setMessageNumber(mn);
-                msg.setTo(to);
-                msg.setContent(blob.getBinaryStream());
-                msgs.add(msg);                
-            }            
         } catch (Exception ex) {
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
@@ -560,18 +530,9 @@ public class RMTxStore implements RMStor
     
     public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean
outbound) {
         try {
-            beginTransaction();
             PreparedStatement stmt = outbound ? deleteOutboundMessageStmt : deleteInboundMessageStmt;
-            if (null == stmt) {
-                stmt = connection.prepareStatement(MessageFormat.format(DELETE_MESSAGE_STMT_STR,
-                    outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME));
-                if (outbound) {
-                    deleteOutboundMessageStmt = stmt;                    
-                } else {
-                    deleteInboundMessageStmt = stmt;
-                }
-            }
-    
+            beginTransaction();
+
             stmt.setString(1, sid.getValue());
                         
             for (Long messageNr : messageNrs) {
@@ -788,6 +749,33 @@ public class RMTxStore implements RMStor
         stmt.close();
     }
 
+    private void createStatements() throws SQLException {
+        // create the statements in advance to avoid synchronization later 
+        createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
+        createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
+        deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
+        deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
+        updateDestSequenceStmt = connection.prepareStatement(UPDATE_DEST_SEQUENCE_STMT_STR);
+        updateSrcSequenceStmt = connection.prepareStatement(UPDATE_SRC_SEQUENCE_STMT_STR);
+        selectDestSequencesStmt = connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);
+        selectSrcSequencesStmt = connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);
+        selectDestSequenceStmt = connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
+        selectSrcSequenceStmt = connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
+        createInboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(CREATE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+        createOutboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(CREATE_MESSAGE_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+        deleteInboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(DELETE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+        deleteOutboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(DELETE_MESSAGE_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+        selectInboundMessagesStmt = connection.prepareStatement(
+            MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+        selectOutboundMessagesStmt = connection.prepareStatement(
+            MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+    }
+
+
     @PostConstruct     
     public synchronized void init() {
         
@@ -826,6 +814,7 @@ public class RMTxStore implements RMStor
             connection.setAutoCommit(true);
             setCurrentSchema();
             createTables();
+            createStatements();
         } catch (SQLException ex) {
             LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
             SQLException se = ex;

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java?rev=1332129&r1=1332128&r2=1332129&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
(original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Mon Apr 30 09:12:15 2012
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm.persistence.jdbc;
 
 import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -135,6 +136,24 @@ public class RMTxStoreUpgradeTest extend
         }
         
         @Override
+        public synchronized void init() {
+            if (upgraded) {
+                super.init();
+            } else {
+                // just create the old tables
+                try {
+                    setConnection(DriverManager.getConnection(getUrl()));
+                    createTables();
+                } catch (SQLException e) {
+                    // ignore this error
+                }
+                
+            }
+        }
+
+
+
+        @Override
         protected void createTables() throws SQLException {
             if (upgraded) {
                 super.createTables();



Mime
View raw message