Author: ay
Date: Mon Apr 30 09:12:15 2012
New Revision: 1332129
URL: http://svn.apache.org/viewvc?rev=1332129&view=rev
Log:
[CXF-4276] Make WS-RM's default store RMTxStore to be shareable
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1332129&r1=1332128&r2=1332129&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Mon Apr 30 09:12:15 2012
@@ -263,9 +263,6 @@ public class RMTxStore implements RMStor
try {
beginTransaction();
- if (null == createDestSequenceStmt) {
- createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
- }
createDestSequenceStmt.setString(1, sequenceIdentifier);
String addr = seq.getAcksTo().getAddress().getValue();
createDestSequenceStmt.setString(2, addr);
@@ -293,10 +290,6 @@ public class RMTxStore implements RMStor
try {
beginTransaction();
- if (null == createSrcSequenceStmt) {
- createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
- }
- assert null != createSrcSequenceStmt;
createSrcSequenceStmt.setString(1, sequenceIdentifier);
Date expiry = seq.getExpires();
createSrcSequenceStmt.setLong(2, expiry == null ? 0 : expiry.getTime());
@@ -319,23 +312,22 @@ public class RMTxStore implements RMStor
LOG.info("Getting destination sequence for id: " + sid);
}
try {
- if (null == selectDestSequenceStmt) {
- selectDestSequenceStmt =
- connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
- }
- selectDestSequenceStmt.setString(1, sid.getValue());
- ResultSet res = selectDestSequenceStmt.executeQuery();
- if (res.next()) {
- EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
- long lm = res.getLong(2);
- ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
- InputStream is = res.getBinaryStream(4);
- SequenceAcknowledgement ack = null;
- if (null != is) {
- ack = PersistenceUtils.getInstance()
- .deserialiseAcknowledgment(is);
+ synchronized (selectDestSequenceStmt) {
+ selectDestSequenceStmt.setString(1, sid.getValue());
+ ResultSet res = selectDestSequenceStmt.executeQuery();
+
+ if (res.next()) {
+ EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
+ long lm = res.getLong(2);
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
+ InputStream is = res.getBinaryStream(4);
+ SequenceAcknowledgement ack = null;
+ if (null != is) {
+ ack = PersistenceUtils.getInstance()
+ .deserialiseAcknowledgment(is);
+ }
+ return new DestinationSequence(sid, acksTo, lm, ack, pv);
}
- return new DestinationSequence(sid, acksTo, lm, ack, pv);
}
} catch (SQLException ex) {
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
@@ -348,27 +340,24 @@ public class RMTxStore implements RMStor
LOG.info("Getting source sequences for id: " + sid);
}
try {
- if (null == selectSrcSequenceStmt) {
- selectSrcSequenceStmt =
- connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
- }
- selectSrcSequenceStmt.setString(1, sid.getValue());
- ResultSet res = selectSrcSequenceStmt.executeQuery();
-
- if (res.next()) {
- long cmn = res.getLong(1);
- boolean lm = res.getBoolean(2);
- long lval = res.getLong(3);
- Date expiry = 0 == lval ? null : new Date(lval);
- String oidValue = res.getString(4);
- Identifier oi = null;
- if (null != oidValue) {
- oi = RMUtils.getWSRMFactory().createIdentifier();
- oi.setValue(oidValue);
- }
- ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
- return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
-
+ synchronized (selectSrcSequenceStmt) {
+ selectSrcSequenceStmt.setString(1, sid.getValue());
+ ResultSet res = selectSrcSequenceStmt.executeQuery();
+
+ if (res.next()) {
+ long cmn = res.getLong(1);
+ boolean lm = res.getBoolean(2);
+ long lval = res.getLong(3);
+ Date expiry = 0 == lval ? null : new Date(lval);
+ String oidValue = res.getString(4);
+ Identifier oi = null;
+ if (null != oidValue) {
+ oi = RMUtils.getWSRMFactory().createIdentifier();
+ oi.setValue(oidValue);
+ }
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(5));
+ return new SourceSequence(sid, expiry, oi, cmn, lm, pv);
+ }
}
} catch (SQLException ex) {
// ignore
@@ -381,9 +370,6 @@ public class RMTxStore implements RMStor
try {
beginTransaction();
- if (null == deleteDestSequenceStmt) {
- deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
- }
deleteDestSequenceStmt.setString(1, sid.getValue());
deleteDestSequenceStmt.execute();
@@ -400,9 +386,6 @@ public class RMTxStore implements RMStor
try {
beginTransaction();
- if (null == deleteSrcSequenceStmt) {
- deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
- }
deleteSrcSequenceStmt.setString(1, sid.getValue());
deleteSrcSequenceStmt.execute();
@@ -420,27 +403,24 @@ public class RMTxStore implements RMStor
}
Collection<DestinationSequence> seqs = new ArrayList<DestinationSequence>();
try {
- if (null == selectDestSequencesStmt) {
- selectDestSequencesStmt =
- connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);
- }
- selectDestSequencesStmt.setString(1, endpointIdentifier);
-
- ResultSet res = selectDestSequencesStmt.executeQuery();
- while (res.next()) {
- Identifier sid = new Identifier();
- sid.setValue(res.getString(1));
- EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
- long lm = res.getLong(3);
- ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
- InputStream is = res.getBinaryStream(5);
- SequenceAcknowledgement ack = null;
- if (null != is) {
- ack = PersistenceUtils.getInstance()
- .deserialiseAcknowledgment(is);
+ synchronized (selectDestSequencesStmt) {
+ selectDestSequencesStmt.setString(1, endpointIdentifier);
+ ResultSet res = selectDestSequencesStmt.executeQuery();
+ while (res.next()) {
+ Identifier sid = new Identifier();
+ sid.setValue(res.getString(1));
+ EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
+ long lm = res.getLong(3);
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
+ InputStream is = res.getBinaryStream(5);
+ SequenceAcknowledgement ack = null;
+ if (null != is) {
+ ack = PersistenceUtils.getInstance()
+ .deserialiseAcknowledgment(is);
+ }
+ DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack,
pv);
+ seqs.add(seq);
}
- DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv);
- seqs.add(seq);
}
} catch (SQLException ex) {
LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(),
ex);
@@ -454,29 +434,26 @@ public class RMTxStore implements RMStor
}
Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
try {
- if (null == selectSrcSequencesStmt) {
- selectSrcSequencesStmt =
- connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);
- }
- selectSrcSequencesStmt.setString(1, endpointIdentifier);
- ResultSet res = selectSrcSequencesStmt.executeQuery();
-
- while (res.next()) {
- Identifier sid = new Identifier();
- sid.setValue(res.getString(1));
- long cmn = res.getLong(2);
- boolean lm = res.getBoolean(3);
- long lval = res.getLong(4);
- Date expiry = 0 == lval ? null : new Date(lval);
- String oidValue = res.getString(5);
- Identifier oi = null;
- if (null != oidValue) {
- oi = new Identifier();
- oi.setValue(oidValue);
- }
- ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
- SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, pv);
- seqs.add(seq);
+ synchronized (selectSrcSequencesStmt) {
+ selectSrcSequencesStmt.setString(1, endpointIdentifier);
+ ResultSet res = selectSrcSequencesStmt.executeQuery();
+ while (res.next()) {
+ Identifier sid = new Identifier();
+ sid.setValue(res.getString(1));
+ long cmn = res.getLong(2);
+ boolean lm = res.getBoolean(3);
+ long lval = res.getLong(4);
+ Date expiry = 0 == lval ? null : new Date(lval);
+ String oidValue = res.getString(5);
+ Identifier oi = null;
+ if (null != oidValue) {
+ oi = new Identifier();
+ oi.setValue(oidValue);
+ }
+ ProtocolVariation pv = decodeProtocolVersion(res.getString(6));
+ SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm, pv);
+ seqs.add(seq);
+ }
}
} catch (SQLException ex) {
// ignore
@@ -489,27 +466,20 @@ public class RMTxStore implements RMStor
Collection<RMMessage> msgs = new ArrayList<RMMessage>();
try {
PreparedStatement stmt = outbound ? selectOutboundMessagesStmt : selectInboundMessagesStmt;
- if (null == stmt) {
- stmt = connection.prepareStatement(MessageFormat.format(SELECT_MESSAGES_STMT_STR,
- outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME));
- if (outbound) {
- selectOutboundMessagesStmt = stmt;
- } else {
- selectInboundMessagesStmt = stmt;
+ synchronized (stmt) {
+ stmt.setString(1, sid.getValue());
+ ResultSet res = stmt.executeQuery();
+ while (res.next()) {
+ long mn = res.getLong(1);
+ String to = res.getString(2);
+ Blob blob = res.getBlob(3);
+ RMMessage msg = new RMMessage();
+ msg.setMessageNumber(mn);
+ msg.setTo(to);
+ msg.setContent(blob.getBinaryStream());
+ msgs.add(msg);
}
}
- stmt.setString(1, sid.getValue());
- ResultSet res = stmt.executeQuery();
- while (res.next()) {
- long mn = res.getLong(1);
- String to = res.getString(2);
- Blob blob = res.getBlob(3);
- RMMessage msg = new RMMessage();
- msg.setMessageNumber(mn);
- msg.setTo(to);
- msg.setContent(blob.getBinaryStream());
- msgs.add(msg);
- }
} catch (Exception ex) {
LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
: "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
@@ -560,18 +530,9 @@ public class RMTxStore implements RMStor
public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean
outbound) {
try {
- beginTransaction();
PreparedStatement stmt = outbound ? deleteOutboundMessageStmt : deleteInboundMessageStmt;
- if (null == stmt) {
- stmt = connection.prepareStatement(MessageFormat.format(DELETE_MESSAGE_STMT_STR,
- outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME));
- if (outbound) {
- deleteOutboundMessageStmt = stmt;
- } else {
- deleteInboundMessageStmt = stmt;
- }
- }
-
+ beginTransaction();
+
stmt.setString(1, sid.getValue());
for (Long messageNr : messageNrs) {
@@ -788,6 +749,33 @@ public class RMTxStore implements RMStor
stmt.close();
}
+ private void createStatements() throws SQLException {
+ // create the statements in advance to avoid synchronization later
+ createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
+ createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
+ deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
+ deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
+ updateDestSequenceStmt = connection.prepareStatement(UPDATE_DEST_SEQUENCE_STMT_STR);
+ updateSrcSequenceStmt = connection.prepareStatement(UPDATE_SRC_SEQUENCE_STMT_STR);
+ selectDestSequencesStmt = connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);
+ selectSrcSequencesStmt = connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);
+ selectDestSequenceStmt = connection.prepareStatement(SELECT_DEST_SEQUENCE_STMT_STR);
+ selectSrcSequenceStmt = connection.prepareStatement(SELECT_SRC_SEQUENCE_STMT_STR);
+ createInboundMessageStmt = connection.prepareStatement(
+ MessageFormat.format(CREATE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+ createOutboundMessageStmt = connection.prepareStatement(
+ MessageFormat.format(CREATE_MESSAGE_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+ deleteInboundMessageStmt = connection.prepareStatement(
+ MessageFormat.format(DELETE_MESSAGE_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+ deleteOutboundMessageStmt = connection.prepareStatement(
+ MessageFormat.format(DELETE_MESSAGE_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+ selectInboundMessagesStmt = connection.prepareStatement(
+ MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME));
+ selectOutboundMessagesStmt = connection.prepareStatement(
+ MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME));
+ }
+
+
@PostConstruct
public synchronized void init() {
@@ -826,6 +814,7 @@ public class RMTxStore implements RMStor
connection.setAutoCommit(true);
setCurrentSchema();
createTables();
+ createStatements();
} catch (SQLException ex) {
LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
SQLException se = ex;
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java?rev=1332129&r1=1332128&r2=1332129&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
(original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Mon Apr 30 09:12:15 2012
@@ -20,6 +20,7 @@
package org.apache.cxf.ws.rm.persistence.jdbc;
import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -135,6 +136,24 @@ public class RMTxStoreUpgradeTest extend
}
@Override
+ public synchronized void init() {
+ if (upgraded) {
+ super.init();
+ } else {
+ // just create the old tables
+ try {
+ setConnection(DriverManager.getConnection(getUrl()));
+ createTables();
+ } catch (SQLException e) {
+ // ignore this error
+ }
+
+ }
+ }
+
+
+
+ @Override
protected void createTables() throws SQLException {
if (upgraded) {
super.createTables();
|