ws-sandesha-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gatf...@apache.org
Subject svn commit: r594098 - in /webservices/sandesha/trunk/java/modules: core/src/main/java/org/apache/sandesha2/msgprocessors/ core/src/main/java/org/apache/sandesha2/storage/inmemory/ tests/src/test/java/org/apache/sandesha2/storage/
Date Mon, 12 Nov 2007 13:06:49 GMT
Author: gatfora
Date: Mon Nov 12 05:06:48 2007
New Revision: 594098

URL: http://svn.apache.org/viewvc?rev=594098&view=rev
Log:
When the same SandeshaStorageManager is used in an environment where there are multiple jvms,
it is possible to insert multiple sequences with the same identifier.  The changes here are
for the ApplicationMsgProcessor to take note of the response returned when inserting the RMSBean
and if a false is returned to reissue the getRMSBean request

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
    webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=594098&r1=594097&r2=594098&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Nov 12 05:06:48 2007
@@ -43,7 +43,6 @@
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -249,189 +248,209 @@
 		// to give any storage manager interface a chance to setup any transactional state
 		boolean hasUserTransaction = storageManager.hasUserTransaction(msgContext);
 		
-		if (rmsBean == null) { 
-			// SENDING THE CREATE SEQUENCE.
-			synchronized (RMSBeanMgr.class) {
-				// There is a timing window where 2 sending threads can hit this point
-				// at the same time and both will create an RMSBean to the same endpoint
-				// with the same internal sequenceid
-				// Check that someone hasn't created the bean
-				rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
-				
-				// if first message - setup the sending side sequence - both for the
-				// server and the client sides.
-				if (rmsBean == null) {
-					rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
-					rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager, tran);
+		try {
+			
+			if (rmsBean == null) { 
+				// SENDING THE CREATE SEQUENCE.
+				while (rmsBean == null) {
+					// There is a timing window where 2 sending threads can hit this point
+					// at the same time and both will create an RMSBean to the same endpoint
+					// with the same internal sequenceid
+					// Check that someone hasn't created the bean
+					rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+	
+					// if first message - setup the sending side sequence - both for the
+					// server and the client sides.
+					if (rmsBean == null) {
+						rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
+						rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+	
+						if (rmsBean == null && tran != null && tran.isActive()) {
+							// Rollback the current locks.
+							tran.rollback();
+	
+							// Create a new tran.  This avoids a potential deadlock where the RMS/RMDBeans
+							// are taken in reverse order.
+							tran = storageManager.getTransaction();
+						}
+					}
 				}
-			}
-		
-		} else {
-			outSequenceID = rmsBean.getSequenceID();
-		}
-		
-		// the message number that was last used.
-		long systemMessageNumber = rmsBean.getNextMessageNumber();
-
-		// The number given by the user has to be larger than the last stored
-		// number.
-		if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber)
{
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg,
Long
-					.toString(givenMessageNumber));
-			throw new SandeshaException(message);
-		}
-
-		// Finding the correct message number.
-		long messageNumber = -1;
-		if (givenMessageNumber > 0) // if given message number is valid use it.
-									// (this is larger than the last stored due
-									// to the last check)
-			messageNumber = givenMessageNumber;
-		else if (systemMessageNumber > 0) { // if system message number is valid
-											// use it.
-			messageNumber = systemMessageNumber + 1;
-		} else { // This is the first message (systemMessageNumber = -1)
-			messageNumber = 1;
-		}
-
-		if (serverSide) {
-			// Deciding whether this is the last message. We assume it is if it relates to
-			// a message which arrived with the LastMessage flag on it. 
-			RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
		
-			// Get the last in message
-			String lastRequestId = rmdBean.getLastInMessageId();
-			RelatesTo relatesTo = msgContext.getRelatesTo();
-			if(relatesTo != null && lastRequestId != null &&
-					lastRequestId.equals(relatesTo.getValue())) {
-				lastMessage = true;
+	
+			} else {
+				outSequenceID = rmsBean.getSequenceID();
 			}
 			
-			//or a constant property may call it as the last msg
-			Boolean inboundLast = (Boolean) msgContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE);

-			if (inboundLast!=null && inboundLast.booleanValue())
-				lastMessage = true;
-		}
-		
-		if (lastMessage) 
-			rmsBean.setLastOutMessage(messageNumber);		
-
-		// set this as the response highest message.
-		rmsBean.setHighestOutMessageNumber(messageNumber);
-		
-		// saving the used message number, and the expected reply count
-		boolean startPolling = false;
-		if (!dummyMessage) {
-			rmsBean.setNextMessageNumber(messageNumber);
-
-			// Identify the MEP associated with the message.
-			AxisOperation op = msgContext.getAxisOperation();
-			int mep = WSDLConstants.MEP_CONSTANT_INVALID;
-			if(op != null) {
-				mep = op.getAxisSpecificMEPConstant();
+			// the message number that was last used.
+			long systemMessageNumber = rmsBean.getNextMessageNumber();
+	
+			// The number given by the user has to be larger than the last stored
+			// number.
+			if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber)
{
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg,
Long
+						.toString(givenMessageNumber));
+				throw new SandeshaException(message);
 			}
-
-			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
-				// We only match up requests and replies when we are doing sync interactions
-				if (log.isDebugEnabled()) log.debug("MEP OUT_IN");
-				EndpointReference replyTo = msgContext.getReplyTo();
-				if(replyTo == null || replyTo.hasAnonymousAddress()) {
-					long expectedReplies = rmsBean.getExpectedReplies();
-					rmsBean.setExpectedReplies(expectedReplies + 1);
+	
+			// Finding the correct message number.
+			long messageNumber = -1;
+			if (givenMessageNumber > 0) // if given message number is valid use it.
+										// (this is larger than the last stored due
+										// to the last check)
+				messageNumber = givenMessageNumber;
+			else if (systemMessageNumber > 0) { // if system message number is valid
+												// use it.
+				messageNumber = systemMessageNumber + 1;
+			} else { // This is the first message (systemMessageNumber = -1)
+				messageNumber = 1;
+			}
+	
+			if (serverSide) {
+				// Deciding whether this is the last message. We assume it is if it relates to
+				// a message which arrived with the LastMessage flag on it. 
+				RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
		
+				// Get the last in message
+				String lastRequestId = rmdBean.getLastInMessageId();
+				RelatesTo relatesTo = msgContext.getRelatesTo();
+				if(relatesTo != null && lastRequestId != null &&
+						lastRequestId.equals(relatesTo.getValue())) {
+					lastMessage = true;
 				}
-
-				// If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
-				//(do should be done only for WSRM 1.1)
 				
-				String specVersion = SequenceManager.getSpecVersion(rmMsgCtx.getMessageContext(), storageManager);
-				if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) {
-					if (log.isDebugEnabled()) log.debug("SPEC_1_1");
-					String oldAddress = (replyTo == null) ? null : replyTo.getAddress();
-					EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext
-							.getReplyTo(), configContext);
-					String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
-					if (newAddress != null && !newAddress.equals(oldAddress)) {
-						// We have rewritten the replyTo. If this is the first message that we have needed
to
-						// rewrite then we should set the sequence up for polling, and once we have saved the
-						// changes to the sequence then we can start the polling thread.
-						msgContext.setReplyTo(newReplyTo);
-						if (!rmsBean.isPollingMode()) {
-							rmsBean.setPollingMode(true);
-							startPolling = true;
+				//or a constant property may call it as the last msg
+				Boolean inboundLast = (Boolean) msgContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE);

+				if (inboundLast!=null && inboundLast.booleanValue())
+					lastMessage = true;
+			}
+			
+			if (lastMessage) 
+				rmsBean.setLastOutMessage(messageNumber);		
+	
+			// set this as the response highest message.
+			rmsBean.setHighestOutMessageNumber(messageNumber);
+			
+			// saving the used message number, and the expected reply count
+			boolean startPolling = false;
+			if (!dummyMessage) {
+				rmsBean.setNextMessageNumber(messageNumber);
+	
+				// Identify the MEP associated with the message.
+				AxisOperation op = msgContext.getAxisOperation();
+				int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+				if(op != null) {
+					mep = op.getAxisSpecificMEPConstant();
+				}
+	
+				if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+					// We only match up requests and replies when we are doing sync interactions
+					if (log.isDebugEnabled()) log.debug("MEP OUT_IN");
+					EndpointReference replyTo = msgContext.getReplyTo();
+					if(replyTo == null || replyTo.hasAnonymousAddress()) {
+						long expectedReplies = rmsBean.getExpectedReplies();
+						rmsBean.setExpectedReplies(expectedReplies + 1);
+					}
+	
+					// If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
+					//(do should be done only for WSRM 1.1)
+					
+					String specVersion = SequenceManager.getSpecVersion(rmMsgCtx.getMessageContext(), storageManager);
+					if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) {
+						if (log.isDebugEnabled()) log.debug("SPEC_1_1");
+						String oldAddress = (replyTo == null) ? null : replyTo.getAddress();
+						EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext
+								.getReplyTo(), configContext);
+						String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
+						if (newAddress != null && !newAddress.equals(oldAddress)) {
+							// We have rewritten the replyTo. If this is the first message that we have needed
to
+							// rewrite then we should set the sequence up for polling, and once we have saved
the
+							// changes to the sequence then we can start the polling thread.
+							msgContext.setReplyTo(newReplyTo);
+							if (!rmsBean.isPollingMode()) {
+								rmsBean.setPollingMode(true);
+								startPolling = true;
+							}
 						}
 					}
 				}
 			}
-		}
-		if (log.isDebugEnabled()) log.debug("App msg using replyTo EPR as " + msgContext.getReplyTo());
-		
-		RelatesTo relatesTo = msgContext.getRelatesTo();
-		if(relatesTo != null) {
-			rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
-		}
-
-		// setting async ack endpoint for the server side. (if present)
-		if (serverSide) {
-			if (rmsBean.getToEndpointReference() != null) {
-				msgContext.setProperty(SandeshaClientConstants.AcksTo, rmsBean.getToEndpointReference().getAddress());
+			if (log.isDebugEnabled()) log.debug("App msg using replyTo EPR as " + msgContext.getReplyTo());
+			
+			RelatesTo relatesTo = msgContext.getRelatesTo();
+			if(relatesTo != null) {
+				rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
 			}
-		}
-
-		// Update the rmsBean
-		storageManager.getRMSBeanMgr().update(rmsBean);
-		
-		if(startPolling) {
-			SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
-		}
-		
-		SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
-		if (env == null) {
-			SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(env))
-					.getDefaultEnvelope();
-			rmMsgCtx.setSOAPEnvelop(envelope);
-		}
-
-		SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
-		if (soapBody == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
-			log.debug(message);
-			throw new SandeshaException(message);
-		}
-
-		String messageId1 = SandeshaUtil.getUUID();
-		if (rmMsgCtx.getMessageId() == null) {
-			rmMsgCtx.setMessageId(messageId1);
-		}
-
-		EndpointReference toEPR = msgContext.getTo();
-
-		
-		if (toEPR != null) {
-			// setting default actions.
-			String to = toEPR.getAddress();
-			String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
-			if (msgContext.getWSAAction() == null) {
-				msgContext.setWSAAction(to + "/" + operationName);
+	
+			// setting async ack endpoint for the server side. (if present)
+			if (serverSide) {
+				if (rmsBean.getToEndpointReference() != null) {
+					msgContext.setProperty(SandeshaClientConstants.AcksTo, rmsBean.getToEndpointReference().getAddress());
+				}
+			}
+	
+			// Update the rmsBean
+			storageManager.getRMSBeanMgr().update(rmsBean);
+			
+			if(startPolling) {
+				SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
+			}
+			
+			SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
+			if (env == null) {
+				SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(env))
+						.getDefaultEnvelope();
+				rmMsgCtx.setSOAPEnvelop(envelope);
+			}
+	
+			SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
+			if (soapBody == null) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
+				log.debug(message);
+				throw new SandeshaException(message);
 			}
-			if (msgContext.getSoapAction() == null) {
-				msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
+	
+			String messageId1 = SandeshaUtil.getUUID();
+			if (rmMsgCtx.getMessageId() == null) {
+				rmMsgCtx.setMessageId(messageId1);
+			}
+	
+			EndpointReference toEPR = msgContext.getTo();
+	
+			
+			if (toEPR != null) {
+				// setting default actions.
+				String to = toEPR.getAddress();
+				String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
+				if (msgContext.getWSAAction() == null) {
+					msgContext.setWSAAction(to + "/" + operationName);
+				}
+				if (msgContext.getSoapAction() == null) {
+					msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
+				}
+			}
+			
+			// processing the response if not an dummy.
+			if (!dummyMessage)
+				processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber,
storageKey, storageManager, tran, hasUserTransaction);
+			
+			//Users wont be able to get reliable response msgs in the back channel in the back channel
of a 
+			//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
+			msgContext.pause();
+			
+			if (tran != null && tran.isActive()) {
+				tran.commit();
+				tran = null;
 			}
 		}
-		
-		// processing the response if not an dummy.
-		if (!dummyMessage)
-			processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber,
storageKey, storageManager, tran, hasUserTransaction);
-		
-		//Users wont be able to get reliable response msgs in the back channel in the back channel
of a 
-		//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
-		msgContext.pause();
-		
+		finally {
+			if (tran != null && tran.isActive())
+				tran.rollback();
+		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
 		return true;
 	}
 
 	private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, RMSBean rmsBean,
-			StorageManager storageManager, Transaction tran) throws AxisFault {
+			StorageManager storageManager) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
@@ -446,8 +465,6 @@
 		CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
 				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
 
-		SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
-
 		SequenceOffer offer = createSequencePart.getSequenceOffer();
 		if (offer != null) {
 			String offeredSequenceId = offer.getIdentifer().getIdentifier();
@@ -465,45 +482,47 @@
 		rmsBean.setCreateSeqMsgID(createSeqMsg.getMessageID());
 		rmsBean.setCreateSequenceMsgStoreKey(createSequenceMessageStoreKey);
 		
-		//cloning the message and storing it as a reference.
-		MessageContext clonedMessage = SandeshaUtil.cloneMessageContext(createSeqMsg);
-		String clonedMsgStoreKey = SandeshaUtil.getUUID();
-		storageManager.storeMessageContext(clonedMsgStoreKey, clonedMessage);
-		rmsBean.setReferenceMessageStoreKey(clonedMsgStoreKey);
-		
-		SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.MessageContextProperties.SECURITY_TOKEN);
-		if(token != null) {
-			SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
-			rmsBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
-		}
-		
-		storageManager.getRMSBeanMgr().insert(rmsBean);
-
-		SenderBean createSeqEntry = new SenderBean();
-		createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
-		createSeqEntry.setTimeToSend(System.currentTimeMillis());
-		createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
-		createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
-		// this will be set to true in the sender
-		createSeqEntry.setSend(true);
-		// Indicate that this message is a create sequence
-		createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
-		EndpointReference to = createSeqRMMessage.getTo();
-		if (to!=null)
-			createSeqEntry.setToAddress(to.getAddress());
-		// If this message is targetted at an anonymous address then we must not have a transport
-		// ready for it, as the create sequence is not a reply.
-		if(to == null || to.hasAnonymousAddress())
-			createSeqEntry.setTransportAvailable(false);
-
-		createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-		
-		SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
-
-		retransmitterMgr.insert(createSeqEntry);
-
-		// Setup enough of the workers to get this create sequence off the box.
-		SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+		if (storageManager.getRMSBeanMgr().insert(rmsBean)) {
+			//cloning the message and storing it as a reference.
+			MessageContext clonedMessage = SandeshaUtil.cloneMessageContext(createSeqMsg);
+			String clonedMsgStoreKey = SandeshaUtil.getUUID();
+			storageManager.storeMessageContext(clonedMsgStoreKey, clonedMessage);
+			rmsBean.setReferenceMessageStoreKey(clonedMsgStoreKey);
+			
+			SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.MessageContextProperties.SECURITY_TOKEN);
+			if(token != null) {
+				SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+				rmsBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
+			}
+	
+			SenderBean createSeqEntry = new SenderBean();
+			createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
+			createSeqEntry.setTimeToSend(System.currentTimeMillis());
+			createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
+			createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
+			// this will be set to true in the sender
+			createSeqEntry.setSend(true);
+			// Indicate that this message is a create sequence
+			createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+			EndpointReference to = createSeqRMMessage.getTo();
+			if (to!=null)
+				createSeqEntry.setToAddress(to.getAddress());
+			// If this message is targetted at an anonymous address then we must not have a transport
+			// ready for it, as the create sequence is not a reply.
+			if(to == null || to.hasAnonymousAddress())
+				createSeqEntry.setTransportAvailable(false);
+	
+			createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+			
+			SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
+	
+			storageManager.getSenderBeanMgr().insert(createSeqEntry);
+	
+			// Setup enough of the workers to get this create sequence off the box.
+			SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+		} else {
+			rmsBean = null;
+		}
 		
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
@@ -582,8 +601,12 @@
 		// increasing the current handler index, so that the message will not be
 		// going throught the SandeshaOutHandler again.
 		msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
+
 		SandeshaUtil.executeAndStore(rmMsg, storageKey, storageManager);
 		
+		// Insert the SenderBean
+		retransmitterMgr.insert(appMsgEntry);
+
 		// Lock the sender bean before we insert it, if we are planning to send it ourselves
 		SenderWorker worker = null;
 		if(sendingNow) {
@@ -598,9 +621,7 @@
 		  // Actually take the lock
 		  lock.addWork(workId, worker);
 		}
-
-		retransmitterMgr.insert(appMsgEntry);
-
+		
 		// Commit the transaction, so that the sender worker starts with a clean slate.
 		if(tran != null && tran.isActive()) tran.commit();
 		 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=594098&r1=594097&r2=594098&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Mon Nov 12 05:06:48 2007
@@ -158,8 +158,11 @@
 			}
 		}
 
+    // Get the CreateSeqBean based on the message id to take a lock on the bean
+    SenderBean createSeqBean = retransmitterMgr.retrieve(createSeqMsgId);
+    
 		// deleting the create sequence sender bean entry.
-		retransmitterMgr.delete(createSeqMsgId);
+		retransmitterMgr.delete(createSeqBean.getMessageID());
 		
 		// Remove the create sequence message
 		storageManager.removeMessageContext(rmsBean.getCreateSequenceMsgStoreKey());

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java?rev=594098&r1=594097&r2=594098&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
Mon Nov 12 05:06:48 2007
@@ -36,7 +36,7 @@
 abstract class InMemoryBeanMgr {
 
 	private static final Log log = LogFactory.getLog(InMemoryBeanMgr.class);
-	private Hashtable table;
+	protected Hashtable table;
 	protected InMemoryStorageManager mgr;
 
 	protected InMemoryBeanMgr(InMemoryStorageManager mgr, AbstractContext context, String key)
{
@@ -145,29 +145,9 @@
 		return beans;
 	}
 
-	protected RMBean findUnique (RMBean matchInfo) throws SandeshaException {
+	protected RMBean findUnique (RMBean matchInfo) throws SandeshaStorageException {
 		if(log.isDebugEnabled()) log.debug("Entry: InMemoryBeanMgr " + this.getClass() + " findUnique
" + matchInfo);
-		RMBean result = null;
-		synchronized (table) {
-			Iterator i = table.values().iterator();
-			while(i.hasNext()) {
-				RMBean candidate = (RMBean)i.next();
-				if(candidate.match(matchInfo)) {
-					if(result == null) {
-						result = candidate;
-					} else {
-						String message = SandeshaMessageHelper.getMessage(
-								SandeshaMessageKeys.nonUniqueResult,
-								result.toString(),
-								candidate.toString());
-						SandeshaException e = new SandeshaException(message);
-						log.error(message, e);
-						throw e;
-					}
-				}
-			}
-		}
-		
+		RMBean result = findUniqueNoLock(matchInfo);		
 		// Now we have a point-in-time view of the bean, lock it, and double
 		// check that it is still in the table 
 		if(result != null) {
@@ -180,5 +160,30 @@
 		if(log.isDebugEnabled()) log.debug("Exit: InMemoryBeanMgr " + this.getClass() + " findUnique
" + result);
 		return result;
 	}
+  
+  protected RMBean findUniqueNoLock (RMBean matchInfo) throws SandeshaStorageException {
+    RMBean result = null;
+    synchronized (table) {
+      Iterator i = table.values().iterator();
+      while(i.hasNext()) {
+        RMBean candidate = (RMBean)i.next();
+        if(candidate.match(matchInfo)) {
+          if(result == null) {
+            result = candidate;
+          } else {
+            String message = SandeshaMessageHelper.getMessage(
+                SandeshaMessageKeys.nonUniqueResult,
+                result.toString(),
+                candidate.toString());
+            SandeshaStorageException e = new SandeshaStorageException(message);
+            log.error(message, e);
+            throw e;
+          }
+        }
+      }
+    }
+
+    return result;
+  }
 
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java?rev=594098&r1=594097&r2=594098&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
Mon Nov 12 05:06:48 2007
@@ -23,7 +23,6 @@
 
 import org.apache.axis2.context.AbstractContext;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -35,7 +34,14 @@
 	}
 
 	public boolean insert(RMSBean bean) throws SandeshaStorageException {
-		return super.insert(bean.getCreateSeqMsgID(), bean);
+    boolean res = false;
+    synchronized (table) {
+      RMSBean findBean = new RMSBean();
+      findBean.setInternalSequenceID(bean.getInternalSequenceID());
+      if (findUniqueNoLock(findBean) == null)
+        res = super.insert(bean.getCreateSeqMsgID(), bean);
+    }
+		return res;
 	}
 
 	public boolean delete(String msgId) throws SandeshaStorageException {
@@ -54,7 +60,7 @@
 		return super.find(bean);
 	}
 	
-	public RMSBean findUnique (RMSBean bean) throws SandeshaException {
+	public RMSBean findUnique (RMSBean bean) throws SandeshaStorageException {
 		return (RMSBean) super.findUnique(bean);
 	}
 

Modified: webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java?rev=594098&r1=594097&r2=594098&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
(original)
+++ webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
Mon Nov 12 05:06:48 2007
@@ -78,15 +78,15 @@
     	createSeqBean1.setSequenceID("SeqId1");
     	
     	RMSBean createSeqBean2 = new RMSBean ();
-    	createSeqBean2.setInternalSequenceID("TmpSeqId1");
+    	createSeqBean2.setInternalSequenceID("TmpSeqId2");
     	createSeqBean2.setCreateSeqMsgID("CreateSeqMsgId2");
-    	createSeqBean2.setSequenceID("SeqId2");
+    	createSeqBean2.setSequenceID("SeqId1");
     	
         mgr.insert(createSeqBean1);
         mgr.insert(createSeqBean2);
 
         RMSBean target = new RMSBean();
-        target.setInternalSequenceID("TmpSeqId1");
+        target.setSequenceID("SeqId1");
 
         Iterator iter = mgr.find(target).iterator();
         RMSBean tmp = (RMSBean) iter.next();



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Mime
View raw message