cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject svn commit: r1332140 - in /cxf/branches/2.5.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Date Mon, 30 Apr 2012 09:47:25 GMT
Author: ay
Date: Mon Apr 30 09:47:25 2012
New Revision: 1332140

URL: http://svn.apache.org/viewvc?rev=1332140&view=rev
Log:
Merged revisions 1332129 via  svn merge from
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1332129 | ay | 2012-04-30 11:12:15 +0200 (Mon, 30 Apr 2012) | 1 line
  
  [CXF-4276] Make WS-RM's default store RMTxStore to be shareable
........

Modified:
    cxf/branches/2.5.x-fixes/   (props changed)
    cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java

Propchange: cxf/branches/2.5.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1332140&r1=1332139&r2=1332140&view=diff
==============================================================================
--- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Mon Apr 30 09:47:25 2012
@@ -243,9 +243,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == createDestSequenceStmt) {
-                createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
-            }
             createDestSequenceStmt.setString(1, sequenceIdentifier);
             String addr = seq.getAcksTo().getAddress().getValue();
             createDestSequenceStmt.setString(2, addr);
@@ -272,10 +269,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == createSrcSequenceStmt) {
-                createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
-            }
-            assert null != createSrcSequenceStmt;
             createSrcSequenceStmt.setString(1, sequenceIdentifier);
             Date expiry = seq.getExpires();
             createSrcSequenceStmt.setLong(2, expiry == null ? 0 : expiry.getTime());
@@ -297,23 +290,21 @@ public class RMTxStore implements RMStor
             LOG.info("Getting destination sequence for id: " + sid);
         }
         try {
-            if (null == selectDestSequenceStmt) {
-                selectDestSequenceStmt = 
-                    connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);         
     
-            }
-            selectDestSequenceStmt.setString(1, sid.getValue());
-            
-            ResultSet res = selectDestSequenceStmt.executeQuery(); 
-            if (res.next()) {
-                EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
 
-                long lm = res.getLong(2);
-                InputStream is = res.getBinaryStream(3); 
-                SequenceAcknowledgement ack = null;
-                if (null != is) {
-                    ack = PersistenceUtils.getInstance()
-                        .deserialiseAcknowledgment(is); 
+            synchronized (selectDestSequenceStmt) {
+                selectDestSequenceStmt.setString(1, sid.getValue());
+                ResultSet res = selectDestSequenceStmt.executeQuery();
+            
+                if (res.next()) {
+                    EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
 
+                    long lm = res.getLong(2);
+                    InputStream is = res.getBinaryStream(3);
+                    SequenceAcknowledgement ack = null;
+                    if (null != is) {
+                        ack = PersistenceUtils.getInstance()
+                            .deserialiseAcknowledgment(is); 
+                    }
+                    return new DestinationSequence(sid, acksTo, lm, ack, protocol);
                 }
-                return new DestinationSequence(sid, acksTo, lm, ack, protocol);
             }
         } catch (SQLException ex) {
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
@@ -326,26 +317,23 @@ public class RMTxStore implements RMStor
             LOG.info("Getting source sequences for id: " + sid);
         }
         try {
-            if (null == selectSrcSequenceStmt) {
-                selectSrcSequenceStmt = 
-                    connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);     
-            }
-            selectSrcSequenceStmt.setString(1, sid.getValue());
-            ResultSet res = selectSrcSequenceStmt.executeQuery();
-            
-            if (res.next()) {
-                long cmn = res.getLong(1);
-                boolean lm = res.getBoolean(2);
-                long lval = res.getLong(3);
-                Date expiry = 0 == lval ? null : new Date(lval);
-                String oidValue = res.getString(4);
-                Identifier oi = null;
-                if (null != oidValue) {
-                    oi = RMUtils.getWSRMFactory().createIdentifier();
-                    oi.setValue(oidValue);
+            synchronized (selectSrcSequenceStmt) {
+                selectSrcSequenceStmt.setString(1, sid.getValue());
+                ResultSet res = selectSrcSequenceStmt.executeQuery();
+            
+                if (res.next()) {
+                    long cmn = res.getLong(1);
+                    boolean lm = res.getBoolean(2);
+                    long lval = res.getLong(3);
+                    Date expiry = 0 == lval ? null : new Date(lval);
+                    String oidValue = res.getString(4);
+                    Identifier oi = null;
+                    if (null != oidValue) {
+                        oi = RMUtils.getWSRMFactory().createIdentifier();
+                        oi.setValue(oidValue);
+                    }
+                    return new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
                 }                            
-                return new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
-                          
             }
         } catch (SQLException ex) {
             // ignore
@@ -358,9 +346,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == deleteDestSequenceStmt) {
-                deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
-            }
             deleteDestSequenceStmt.setString(1, sid.getValue());
             deleteDestSequenceStmt.execute();
             
@@ -377,9 +362,6 @@ public class RMTxStore implements RMStor
         try {
             beginTransaction();
             
-            if (null == deleteSrcSequenceStmt) {
-                deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
-            }
             deleteSrcSequenceStmt.setString(1, sid.getValue());
             deleteSrcSequenceStmt.execute();
             
@@ -398,26 +380,23 @@ public class RMTxStore implements RMStor
         }
         Collection<DestinationSequence> seqs = new ArrayList<DestinationSequence>();
         try {
-            if (null == selectDestSequencesStmt) {
-                selectDestSequencesStmt = 
-                    connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);        
      
-            }
-            selectDestSequencesStmt.setString(1, endpointIdentifier);
-            
-            ResultSet res = selectDestSequencesStmt.executeQuery(); 
-            while (res.next()) {
-                Identifier sid = new Identifier();                
-                sid.setValue(res.getString(1));
-                EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
 
-                long lm = res.getLong(3);
-                InputStream is = res.getBinaryStream(4); 
-                SequenceAcknowledgement ack = null;
-                if (null != is) {
-                    ack = PersistenceUtils.getInstance()
-                        .deserialiseAcknowledgment(is); 
+            synchronized (selectDestSequencesStmt) {
+                selectDestSequencesStmt.setString(1, endpointIdentifier);
+                ResultSet res = selectDestSequencesStmt.executeQuery(); 
+                while (res.next()) {
+                    Identifier sid = new Identifier();                
+                    sid.setValue(res.getString(1));
+                    EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
 
+                    long lm = res.getLong(3);
+                    InputStream is = res.getBinaryStream(4);
+                    SequenceAcknowledgement ack = null;
+                    if (null != is) {
+                        ack = PersistenceUtils.getInstance()
+                            .deserialiseAcknowledgment(is); 
+                    }
+                    DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack,
protocol);
+                    seqs.add(seq);                                                 
                 }
-                DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, protocol);
-                seqs.add(seq);                                                 
             }
         } catch (SQLException ex) {
             LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
@@ -432,28 +411,25 @@ public class RMTxStore implements RMStor
         }
         Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
         try {
-            if (null == selectSrcSequencesStmt) {
-                selectSrcSequencesStmt = 
-                    connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);     
-            }
-            selectSrcSequencesStmt.setString(1, endpointIdentifier);
-            ResultSet res = selectSrcSequencesStmt.executeQuery();
-            
-            while (res.next()) {
-                Identifier sid = new Identifier();
-                sid.setValue(res.getString(1));
-                long cmn = res.getLong(2);
-                boolean lm = res.getBoolean(3);
-                long lval = res.getLong(4);
-                Date expiry = 0 == lval ? null : new Date(lval);
-                String oidValue = res.getString(5);
-                Identifier oi = null;
-                if (null != oidValue) {
-                    oi = new Identifier();
-                    oi.setValue(oidValue);
+            synchronized (selectSrcSequencesStmt) {
+                selectSrcSequencesStmt.setString(1, endpointIdentifier);
+                ResultSet res = selectSrcSequencesStmt.executeQuery();
+                while (res.next()) {
+                    Identifier sid = new Identifier();
+                    sid.setValue(res.getString(1));
+                    long cmn = res.getLong(2);
+                    boolean lm = res.getBoolean(3);
+                    long lval = res.getLong(4);
+                    Date expiry = 0 == lval ? null : new Date(lval);
+                    String oidValue = res.getString(5);
+                    Identifier oi = null;
+                    if (null != oidValue) {
+                        oi = new Identifier();
+                        oi.setValue(oidValue);
+                    }
+                    SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
+                    seqs.add(seq);                          
                 }                            
-                SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, protocol);
-                seqs.add(seq);                          
             }
         } catch (SQLException ex) {
             // ignore
@@ -466,27 +442,20 @@ public class RMTxStore implements RMStor
         Collection<RMMessage> msgs = new ArrayList<RMMessage>();
         try {
             PreparedStatement stmt = outbound ? selectOutboundMessagesStmt : selectInboundMessagesStmt;
-            if (null == stmt) {
-                stmt = connection.prepareStatement(MessageFormat.format(SELECT_MESSAGES_STMT_STR,
-                    outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME));
-                if (outbound) {
-                    selectOutboundMessagesStmt = stmt;                    
-                } else {
-                    selectInboundMessagesStmt = stmt;
+            synchronized (stmt) {
+                stmt.setString(1, sid.getValue());
+                ResultSet res = stmt.executeQuery();
+                while (res.next()) {
+                    long mn = res.getLong(1);
+                    String to = res.getString(2);
+                    Blob blob = res.getBlob(3);
+                    RMMessage msg = new RMMessage();
+                    msg.setMessageNumber(mn);
+                    msg.setTo(to);
+                    msg.setContent(blob.getBinaryStream());
+                    msgs.add(msg);
                 }
             }
-            stmt.setString(1, sid.getValue());
-            ResultSet res = stmt.executeQuery();
-            while (res.next()) {
-                long mn = res.getLong(1);
-                String to = res.getString(2);
-                Blob blob = res.getBlob(3);
-                RMMessage msg = new RMMessage();
-                msg.setMessageNumber(mn);
-                msg.setTo(to);
-                msg.setContent(blob.getBinaryStream());
-                msgs.add(msg);                
-            }            
         } catch (Exception ex) {
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
@@ -537,18 +506,9 @@ public class RMTxStore implements RMStor
     
     public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean
outbound) {
         try {
-            beginTransaction();
             PreparedStatement stmt = outbound ? deleteOutboundMessageStmt : deleteInboundMessageStmt;
-            if (null == stmt) {
-                stmt = connection.prepareStatement(MessageFormat.format(DELETE_MESSAGE_STMT_STR,
-                    outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME));
-                if (outbound) {
-                    deleteOutboundMessageStmt = stmt;                    
-                } else {
-                    deleteInboundMessageStmt = stmt;
-                }
-            }
-    
+            beginTransaction();
+
             stmt.setString(1, sid.getValue());
                         
             for (Long messageNr : messageNrs) {
@@ -717,6 +677,33 @@ public class RMTxStore implements RMStor
         stmt.close();
     }
 
+    private void createStatements() throws SQLException {
+        // create the statements in advance to avoid synchronization later 
+        createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
+        createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
+        deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
+        deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
+        updateDestSequenceStmt = connection.prepareStatement(UPDATE_DEST_SEQUENCE_STMT_STR);
+        updateSrcSequenceStmt = connection.prepareStatement(UPDATE_SRC_SEQUENCE_STMT_STR);
+        selectDestSequencesStmt = connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);
+        selectSrcSequencesStmt = connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);
+        selectDestSequenceStmt = connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
+        selectSrcSequenceStmt = connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
+        createInboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(CREATE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+        createOutboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(CREATE_MESSAGE_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+        deleteInboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(DELETE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+        deleteOutboundMessageStmt = connection.prepareStatement(
+            MessageFormat.format(DELETE_MESSAGE_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+        selectInboundMessagesStmt = connection.prepareStatement(
+            MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+        selectOutboundMessagesStmt = connection.prepareStatement(
+            MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+    }
+
+
     @PostConstruct     
     public synchronized void init() {
         
@@ -755,6 +742,7 @@ public class RMTxStore implements RMStor
             connection.setAutoCommit(true);
             setCurrentSchema();
             createTables();
+            createStatements();
         } catch (SQLException ex) {
             LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
             SQLException se = ex;



Mime
View raw message