Return-Path: Delivered-To: apmail-incubator-ode-commits-archive@locus.apache.org Received: (qmail 10276 invoked from network); 25 Jun 2007 22:46:41 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 25 Jun 2007 22:46:41 -0000 Received: (qmail 19455 invoked by uid 500); 25 Jun 2007 22:46:44 -0000 Delivered-To: apmail-incubator-ode-commits-archive@incubator.apache.org Received: (qmail 19408 invoked by uid 500); 25 Jun 2007 22:46:44 -0000 Mailing-List: contact ode-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ode-dev@incubator.apache.org Delivered-To: mailing list ode-commits@incubator.apache.org Received: (qmail 19398 invoked by uid 99); 25 Jun 2007 22:46:44 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jun 2007 15:46:44 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jun 2007 15:46:39 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id AAB761A981A; Mon, 25 Jun 2007 15:46:19 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r550631 - in /incubator/ode/branches/bart: bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ Date: Mon, 25 Jun 2007 22:46:19 -0000 To: ode-commits@incubator.apache.org From: mszefler@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070625224619.AAB761A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mszefler Date: Mon Jun 25 15:46:18 2007 New Revision: 550631 URL: http://svn.apache.org/viewvc?view=rev&rev=550631 Log: BART Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java (original) +++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java Mon Jun 25 15:46:18 2007 @@ -113,6 +113,13 @@ String getMessageExchangeId() throws BpelEngineException; + + /** + * Get the invocation style for this message exchange. + * @return + */ + InvocationStyle getInvocationStyle(); + /** * Get the name of the operation (WSDL 1.1) / message exchange (WSDL 1.2?). * Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java (original) +++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleChannel.java Mon Jun 25 15:46:18 2007 @@ -31,39 +31,13 @@ public interface PartnerRoleChannel { /** - * Style of invocation supported on the given channel. + * Get the supported invocation styles (see {@link InvocationStyle}) for invoking the EPR provided + * via this channel. * - * @author Maciej Szefler + * @param partnerEpr partner's EPR + * @return supported invocation styles */ - public enum InvocationStyle { - /** - * The very ordinary blocking IO style --- the IL will block until the operation is complete, or until - * a timeout is reached. - */ - BLOCKING, - - /** - * Asynchrnous style -- the IL will "queue" the invocation, and call-back asynchrnously when the response - * is available. - */ - ASYNC, - - /** - * Reliable style -- the IL will queue the invocation using the current transaction. The response will be - * delivered when available using a separate transaction. - */ - RELIABLE, - - - /** - * Transacted style -- the IL will enroll the operation with the current transaction. The IL will block until the - * operation completes. - */ - TRANSACTED - } - - - Set getSupportedInvocationStyle(); + Set getSupportedInvocationStyle(EndpointReference partnerEpr); /** * Return the endpoint reference to the endpoint with which the Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java (original) +++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/PartnerRoleMessageExchange.java Mon Jun 25 15:46:18 2007 @@ -31,11 +31,6 @@ */ public interface PartnerRoleMessageExchange extends MessageExchange { - /** - * Get the invocation style for this message exchange. - * @return - */ - PartnerRoleChannel.InvocationStyle getInvocationStyle(); /** * Get the identifier of the process that created this message exchange. Modified: incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java (original) +++ incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java Mon Jun 25 15:46:18 2007 @@ -18,12 +18,13 @@ */ package org.apache.ode.bpel.dao; -import org.w3c.dom.Element; - -import javax.xml.namespace.QName; import java.util.Date; import java.util.Set; +import javax.xml.namespace.QName; + +import org.w3c.dom.Element; + /** * Data access object for a message exchange. */ @@ -41,6 +42,19 @@ String getMessageExchangeId(); /** + * Get the invocation style. + * @return + */ + String getInvocationStyle(); + + /** + * Set the invocation style. + * @param invocationStyle + */ + void setInvocationStyle(String invocationStyle); + + + /** * Get output message (could be fault message) * * @return output message DAO @@ -234,13 +248,16 @@ PartnerLinkDAO getPartnerLink(); /** - * Gets the mex id for the message exchange that has been piped with - * this one in a process to process interaction. - * @return - */ - String getPipedMessageExchangeId(); - void setPipedMessageExchangeId(String mexId); + * Gets the message exchange that has been piped with this one in a process to process interaction. + * + * @return other side of the message pipe + */ + MessageExchangeDAO getPipedMessageExchange(); + + void setPipedMessageExchange(MessageExchangeDAO mexId); void release(); + + void setFailureType(String failureType); } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Mon Jun 25 15:46:18 2007 @@ -337,7 +337,7 @@ if (__log.isDebugEnabled()) { __log.debug("InvokeResponse event for iid " + we.getIID()); } - processInstance.invocationResponse(we.getMexId(), we.getChannel()); + processInstance.injectPartnerResponse(we.getMexId(), we.getChannel()); processInstance.execute(); break; case MATCHER: Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Mon Jun 25 15:46:18 2007 @@ -43,13 +43,14 @@ import org.apache.ode.bpel.iapi.ContextException; import org.apache.ode.bpel.iapi.Endpoint; import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.Message; import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.MessageExchangeContext; import org.apache.ode.bpel.iapi.PartnerRoleChannel; import org.apache.ode.bpel.iapi.MessageExchange.FailureType; import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern; -import org.apache.ode.bpel.iapi.PartnerRoleChannel.InvocationStyle; +import org.apache.ode.bpel.iapi.MessageExchange.Status; import org.apache.ode.bpel.iapi.MyRoleMessageExchange; import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl; @@ -109,8 +110,9 @@ /** JACOB ExecutionQueue (state) */ protected ExecutionQueueImpl _soup; - private MyRoleMessageExchangeImpl _instantiatingMessageExchange; + private MessageExchangeDAO _instantiatingMessageExchange; + /** Object for keeping track of all the outstanding s. */ private OutstandingRequestManager _outstandingRequests; /** List of BLOCKING invocations that need to be deferred until the end of the current TX */ @@ -127,7 +129,7 @@ public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS, - MyRoleMessageExchangeImpl instantiatingMessageExchange) { + MessageExchangeDAO instantiatingMessageExchange) { _bpelProcess = bpelProcess; _dao = dao; _iid = dao.getInstanceId(); @@ -544,61 +546,63 @@ evt.setOperation(opName); evt.setPortType(plinkInstnace.partnerLink.myRolePortType.getQName()); - MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef); + // Get the "my-role" mex from the DB. + MessageExchangeDAO myrolemex = _dao.getConnection().getMessageExchange(mexRef); - MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput() + // TODO: add some checks here/could get npe + MessageDAO message = myrolemex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput() .getMessage().getQName()); message.setData(msg); - MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mex); - _bpelProcess.initMyRoleMex(m); - m.setResponse(new MessageImpl(message)); + myrolemex.setResponse(message); + Status status; + if (fault != null) { - mex.setStatus(MessageExchange.Status.FAULT.toString()); - mex.setFault(fault); + status = Status.FAULT; + myrolemex.setFault(fault); evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT); } else { - mex.setStatus(MessageExchange.Status.RESPONSE.toString()); + status = Status.RESPONSE; evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT); } + + myrolemex.setStatus(status.toString()); + + + if (myrolemex.getPipedMessageExchange() != null) /* p2p case */ { + MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange(); - if (mex.getPipedMessageExchangeId() != null) { - PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _bpelProcess - .getEngine().getMessageExchange(mex.getPipedMessageExchangeId()); if (BpelProcess.__log.isDebugEnabled()) { - __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex); - } - try { - switch (m.getStatus()) { - case FAILURE: - // We can't seem to get the failure out of the myrole mex? - pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null); - break; - case FAULT: - Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart()) - .getMessage().getQName()); - faultRes.setMessage(m.getResponse().getMessage()); - pmex.replyWithFault(m.getFault(), faultRes); - break; - case RESPONSE: - Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName()); - response.setMessage(m.getResponse().getMessage()); - pmex.reply(response); - break; - default: - __log.warn("Unexpected state: " + m.getStatus()); - break; - } - } finally { - mex.release(); + __log.debug("Replying to a p2p mex, myrole " + myrolemex + " - partnerole " + pmex); } - } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m); - // send event + // In the p2p case we copy the status/response from one mex object into the other. + pmex.setResponse(myrolemex.getResponse()); + pmex.setStatus(myrolemex.getStatus()); + continuePartnerReplied(pmex); + myrolemex.release(); + + } else /* IL-mediated communication */ { + // TODO: distinguish between different kinds of my-role mexss + MyRoleMessageExchangeImpl myRoleMex = new MyRoleMessageExchangeImpl(); + _bpelProcess._engine._contexts.mexContext.onAsyncReply(myRoleMex); + } + + sendEvent(evt); } + + /** + * Continue a process due to a reply being received from a partner. + * + * @param pmex partner-role message exchange where the response was received + */ + private void continuePartnerReplied(MessageExchangeDAO pmex) { + + } + /** * @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance, * org.apache.ode.bpel.common.CorrelationKey) @@ -699,16 +703,18 @@ public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage, InvokeResponseChannel channel) throws FaultException { - + + // Get the Integration Layer's communication channel for the partnerlink. + PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink); + PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink); - // The target (partner endpoint) -- if it has not been explicitly - // initialized - // then use the value from bthe deployment descriptor .. + Element partnerEPR = plinkDAO.getPartnerEPR(); + EndpointReference partnerEpr; if (partnerEPR == null) { - partnerEpr = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink); + partnerEpr = partnerRoleChannel.getInitialEndpointReference(); // In this case, the partner link has not been initialized. if (partnerEpr == null) throw new FaultException(partnerLink.partnerLink.getOwner().constants.qnUninitializedPartnerRole); @@ -740,127 +746,135 @@ : MessageExchangePattern.REQUEST_ONLY).toString()); mexDao.setChannel(channel == null ? null : channel.export()); - PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink); -// PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao, -// partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, ); - - // Properties used by stateful-exchange protocol. - String mySessionId = plinkDAO.getMySessionId(); - String partnerSessionId = plinkDAO.getPartnerSessionId(); - - if ( mySessionId != null ) - mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId); - if ( partnerSessionId != null ) - mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId); - - if (__log.isDebugEnabled()) - __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId); - MessageDAO message = mexDao.createMessage(operation.getInput().getMessage().getQName()); mexDao.setRequest(message); message.setData(outgoingMessage); message.setType(operation.getInput().getMessage().getQName()); - // Get he my-role EPR (if myrole exists) for optional use by partner - // (for callback mechanism). - EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess - .getInitialMyRoleEPR(partnerLink.partnerLink) : null; - + BpelProcess p2pProcess = null; Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink); if (partnerEndpoint != null) p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, new MemBackedMessageImpl(message.getData(),message.getType(), true)); - if (p2pProcess != null) { - // Creating a my mex using the same message id as partner mex to "pipe" them - MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange( - mexDao.getMessageExchangeId(), partnerEndpoint.serviceName, - operation.getName(), mexDao.getMessageExchangeId()); - - if (BpelProcess.__log.isDebugEnabled()) { - __log.debug("Invoking in a p2p interaction, partnerrole " + mexDao.getMessageExchangeId() + " - myrole " + myRoleMex); - } - - Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName()); - odeRequest.setMessage(outgoingMessage); - - if (BpelProcess.__log.isDebugEnabled()) { - __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " - + partnerSessionId + " - partnerSess " + mySessionId); - } - if ( partnerSessionId != null ) - myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId); - if ( mySessionId != null ) - myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId); - - mexDao.setStatus(MessageExchange.Status.ASYNC.toString()); - myRoleMex.invoke(odeRequest); - } else /* NOT p2p, need to call out to IL */ { - if (partnerEpr != null) { - // If we couldn't find the endpoint, then there is no sense - // in asking the IL to invoke. - mexDao.setEPR(partnerEpr.toXML().getDocumentElement()); - mexDao.setStatus(MessageExchange.Status.REQUEST.toString()); - Set supportedStyles = partnerRoleChannel.getSupportedInvocationStyle(); - if (!_bpelProcess.isInMemory()) { - if (supportedStyles.contains(InvocationStyle.RELIABLE)) { - // If RELIABLE is supported, this is easy, we just do it in-line. - _bpelProcess._engine._contexts.mexContext.invokePartner(); - } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)){ - // If TRANSACTED is supported, this is again easy, do it in-line. - _bpelProcess._engine._contexts.mexContext.invokePartner(mex, MessageExchangeContext.InvocationStyle.TRANSACTED); - } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) { - // For BLOCKING invocation, we defer the call until after commit (unless idempotent). - _todoBlockingCalls.add(mex); - } else if (supportedStyles.contains(InvocationStyle.ASYNC)) { - // For ASYNC style, we defer the call until after commit (unless idempotent). - _todoAsyncCalls.add(mex); - } else { - // This really should not happen, indicates IL is screwy. - __log.error("Integration layer did not agree to any known invocation style for EPR " + DOMUtils.domToString(partnerEPR)); - mex.setFailure(FailureType.COMMUNICATION_ERROR, "NoMatchingStyle",partnerEPR); - } - } else /* in-memory */ { - - } - } else { - __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR)); - mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR); - } - } - evt.setMexId(mexDao.getMessageExchangeId()); sendEvent(evt); - // MEX pattern is request only, at this point the status can only be a one way - if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_ONLY.toString())) { - mexDao.setStatus(MessageExchange.Status.ASYNC.toString()); - // This mex can now be released - mexDao.release(); - } - // Check if there is a synchronous response, if so, we need to inject the - // message on the response channel. - switch (mex.getStatus()) { - case NEW: - throw new AssertionError("Impossible!"); + if (p2pProcess != null) { + /* P2P (process-to-process) invocation, special logic */ + invokeP2P(operation, outgoingMessage, mexDao); + } else { + /* NOT p2p, need to call out to IL */ + invokeIL(partnerRoleChannel, partnerEpr, mexDao); + } + + // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations, + // we need to inject a message on the response channel, so that the process continues. + switch (Status.valueOf(mexDao.getStatus())){ case ASYNC: break; case RESPONSE: case FAULT: case FAILURE: - invocationResponse(mex); + injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel()); break; default: - __log.error("Partner did not acknowledge message exchange: " + mex); - mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null); - invocationResponse(mex); + __log.error("Partner did not acknowledge message exchange: " + mexDao.getMessageExchangeId()); + mexDao.setStatus(Status.FAILURE.toString()); + mexDao.setFailureType(FailureType.NO_RESPONSE.toString()); + injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel()); } + return mexDao.getMessageExchangeId(); } + private void invokeIL( + PartnerLinkInstance partnerLink, + Operation operation, + Element outgoingMessage, + PartnerRoleChannel partnerRoleChannel, + EndpointReference partnerEpr, + MessageExchangeDAO mexDao) { + if (partnerEpr != null) { + // If we couldn't find the endpoint, then there is no sense + // in asking the IL to invoke. + mexDao.setEPR(partnerEpr.toXML().getDocumentElement()); + mexDao.setStatus(MessageExchange.Status.REQUEST.toString()); + Set supportedStyles = partnerRoleChannel.getSupportedInvocationStyle(partnerEpr); + if (supportedStyles.contains(InvocationStyle.RELIABLE)) { + // If RELIABLE is supported, this is easy, we just do it in-line. + throw new UnsupportedOperationException(); // TODO + ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(); + _bpelProcess._engine._contexts.mexContext.invokePartner(reliableMex); + } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)){ + // If TRANSACTED is supported, this is again easy, do it in-line. + throw new UnsupportedOperationException(); // TODO + TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(); + _bpelProcess._engine._contexts.mexContext.invokePartner(transactedMex); + } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) { + // For BLOCKING invocation, we defer the call until after commit (unless idempotent). + BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(); + _todoBlockingCalls.add(blockingMex); + } else if (supportedStyles.contains(InvocationStyle.ASYNC)) { + // For ASYNC style, we defer the call until after commit (unless idempotent). + AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(); + _todoAsyncCalls.add(asyncMex); + } else { + // This really should not happen, indicates IL is screwy. + __log.error("Integration layer did not agree to any known invocation style for EPR " + DOMUtils.domToString(partnerEPR)); + mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString()); + mexDao.setStatus(Status.FAILURE.toString()); + mexDao.setFaultExplanation("NoMatchingStyle"); + } + + } else { + __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR)); + mexDao.setFailureType(FailureType.UNKNOWN_ENDPOINT.toString()); + mexDao.setFaultExplanation("UnknownEndpoint"); + mexDao.setStatus(Status.FAILURE.toString()); + //, partnerEPR); + } + } + + private void invokeP2P(Operation operation, Element outgoingMessage, MessageExchangeDAO mexDao) { + if (BpelProcess.__log.isDebugEnabled()) { + __log.debug("Invoking in a p2p interaction, partnerrole " + mexDao.getMessageExchangeId() + " - myrole " + myRoleMex); + } + + // Properties used by stateful-exchange protocol. + String mySessionId = mexDao.getPartnerLink().getMySessionId(); + String partnerSessionId = mexDao.getPartnerLink().getPartnerSessionId(); + + + if ( mySessionId != null ) + mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId); + if ( partnerSessionId != null ) + mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId); + + if (__log.isDebugEnabled()) + __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId); + + + + Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName()); + odeRequest.setMessage(outgoingMessage); + + if (BpelProcess.__log.isDebugEnabled()) { + __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + + partnerSessionId + " - partnerSess " + mySessionId); + } + if ( partnerSessionId != null ) + myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId); + if ( mySessionId != null ) + myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId); + + mexDao.setStatus(MessageExchange.Status.ASYNC.toString()); + myRoleMex.invoke(odeRequest); + } + void execute() { long maxTime = System.currentTimeMillis() + _maxReductionTimeMs; @@ -911,7 +925,7 @@ } } - void inputMsgMatch(final String responsechannel, final int idx, MyRoleMessageExchangeImpl mex) { + void inputMsgMatch(final String responsechannel, final int idx, MessageExchangeDAO mexdao) { // if we have a message match, this instance should be marked // active if it isn't already if (_dao.getState() == ProcessState.STATE_READY) { @@ -928,9 +942,9 @@ sendEvent(evt); } - _outstandingRequests.associate(responsechannel, mex.getMessageExchangeId()); + _outstandingRequests.associate(responsechannel, mexdao.getMessageExchangeId()); - final String mexId = mex.getMessageExchangeId(); + final String mexId = mexdao.getMessageExchangeId(); _vpu.inject(new JacobRunnable() { private static final long serialVersionUID = 3168964409165899533L; @@ -980,11 +994,7 @@ }); } - void invocationResponse(PartnerRoleMessageExchangeImpl mex) { - invocationResponse(mex.getDAO().getMessageExchangeId(), mex.getDAO().getChannel()); - } - - void invocationResponse(final String mexid, final String responseChannelId) { + void injectPartnerResponse(final String mexid, final String responseChannelId) { if (responseChannelId == null) throw new NullPointerException("Null responseChannelId"); if (mexid == null) @@ -1365,9 +1375,7 @@ + " on CKEY " + ckey); } - MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexdao); - - inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex); + inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mexdao); execute(); } else { __log.debug("MatcherEvent handling: nothing to do, no matching message in DB"); Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Mon Jun 25 15:46:18 2007 @@ -19,6 +19,7 @@ package org.apache.ode.bpel.engine; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -33,6 +34,7 @@ import org.apache.ode.bpel.dao.MessageExchangeDAO; import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.Message; import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.utils.msg.MessageBundle; @@ -41,7 +43,7 @@ /** * Base implementation of the {@link MessageExchange} interface. This interfaces is exposed to the Integration Layer (IL) * to allow it to implement incoming (via {@link MyRoleMessageExchangeImpl}) and outgoing (via {@link PartnerRoleMessageExchangeImpl}) - * communications. + * communications. * * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It is imperative that the integration layer * not attempt to use {@link MessageExchange} objects from multiple threads. @@ -95,6 +97,11 @@ BpelEngineImpl _engine; boolean _associated; + + InvocationStyle _istyle; + + /** The point at which this message-exchange will time out. */ + Date _timeout; enum Change { EPR, @@ -123,11 +130,11 @@ _mexId = mexId; } - public MessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO dao) { - load(dao); - } void load(MessageExchangeDAO dao) { + if (dao.getMessageExchangeId().equals(_mexId)) + throw new IllegalArgumentException("MessageExchangeId mismatch!"); + if (_pattern == null) _pattern = MessageExchangePattern.valueOf(dao.getPattern()); if (_opname == null) @@ -144,11 +151,13 @@ _status = Status.valueOf(dao.getStatus()); if (_callee == null) _callee = dao.getCallee(); - + if (_istyle == null) + _istyle = InvocationStyle.valueOf(dao.getInvocationStyle()); } public void save(MessageExchangeDAO dao) { dao.setStatus(_status.toString()); + dao.setInvocationStyle(_istyle.toString()); dao.setFault(_fault); dao.setFaultExplanation(_explanation); //todo: set failureType @@ -170,6 +179,10 @@ } } + + public InvocationStyle getInvocationStyle() { + return _istyle; + } public boolean isSafe() { Object val = getOperation().getExtensionAttribute(SAFE_ATTRIBUTE); @@ -209,10 +222,6 @@ public String getFaultExplanation() { return _explanation; - } - - public MessageExchangePattern getPattern() { - return _pattern; } public Status getStatus() { Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Mon Jun 25 15:46:18 2007 @@ -45,13 +45,14 @@ PartnerRoleMessageExchangeImpl( BpelEngineImpl engine, + String mexId, PortType portType, Operation operation, boolean inMem, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { - super(engine); + super(engine, mexId); _myRoleEPR = myRoleEPR; _channel = channel; _inMem = inMem; Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=550631&r1=550630&r2=550631 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Mon Jun 25 15:46:18 2007 @@ -1,5 +1,15 @@ package org.apache.ode.bpel.engine; +import javax.wsdl.Operation; +import javax.wsdl.PortType; + +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; + public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl { + + ReliablePartnerRoleMessageExchangeImpl(); + // TODO Auto-generated constructor stub + } }