Return-Path: Delivered-To: apmail-incubator-ode-commits-archive@locus.apache.org Received: (qmail 33069 invoked from network); 20 Jun 2007 17:34:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Jun 2007 17:34:59 -0000 Received: (qmail 34568 invoked by uid 500); 20 Jun 2007 17:35:02 -0000 Delivered-To: apmail-incubator-ode-commits-archive@incubator.apache.org Received: (qmail 34542 invoked by uid 500); 20 Jun 2007 17:35:02 -0000 Mailing-List: contact ode-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ode-dev@incubator.apache.org Delivered-To: mailing list ode-commits@incubator.apache.org Received: (qmail 34533 invoked by uid 99); 20 Jun 2007 17:35:02 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jun 2007 10:35:02 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jun 2007 10:34:57 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 883551A981A; Wed, 20 Jun 2007 10:34:37 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r549162 - /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Date: Wed, 20 Jun 2007 17:34:37 -0000 To: ode-commits@incubator.apache.org From: mszefler@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070620173437.883551A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mszefler Date: Wed Jun 20 10:34:36 2007 New Revision: 549162 URL: http://svn.apache.org/viewvc?view=rev&rev=549162 Log: Hydration latch. Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=549162&r1=549161&r2=549162 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Jun 20 10:34:36 2007 @@ -18,9 +18,21 @@ */ package org.apache.ode.bpel.engine; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import javax.xml.namespace.QName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.common.FaultException; +import org.apache.ode.bpel.dao.BpelDAOConnection; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; import org.apache.ode.bpel.evt.ProcessInstanceEvent; @@ -54,15 +66,6 @@ import org.w3c.dom.NodeList; import org.w3c.dom.Text; -import javax.xml.namespace.QName; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Entry point into the runtime of a BPEL process. * @@ -75,47 +78,56 @@ private static final Messages __msgs = MessageBundle.getMessages(Messages.class); private volatile Map _partnerRoles; + private volatile Map _myRoles; + /** Mapping from {"Service Name" (QNAME) / port} to a myrole. */ private volatile Map _endpointToMyRoleMap; // Backup hashmaps to keep initial endpoints handy after dehydration - private Map _myEprs = - new HashMap(); - private Map _partnerEprs = - new HashMap(); - private Map _partnerChannels = - new HashMap(); + private Map _myEprs = new HashMap(); + + private Map _partnerEprs = new HashMap(); + + private Map _partnerChannels = new HashMap(); final QName _pid; + private volatile OProcess _oprocess; + // Has the process already been hydrated before? private boolean _hydratedOnce = false; + /** Last time the process was used. */ private volatile long _lastUsed; BpelEngineImpl _engine; + DebuggerSupport _debugger; + ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry; + private ReplacementMap _replacementMap; + final ProcessConf _pconf; - // Notifying the server when a process hydrates - private ProcessLifecycleCallback _lifeCallback; + /** {@link MessageExchangeInterceptor}s registered for this process. */ private final List _mexInterceptors = new ArrayList(); - public BpelProcess(ProcessConf conf, BpelEventListener debugger, ProcessLifecycleCallback lifeCallback) { + /** Latch-like thing to control hydration/dehydration. */ + private HydrationLatch _hydrationLatch; + + public BpelProcess(ProcessConf conf, BpelEventListener debugger) { _pid = conf.getProcessId(); _pconf = conf; - _lifeCallback = lifeCallback; + _hydrationLatch = new HydrationLatch(); } public String toString() { return "BpelProcess[" + _pid + "]"; } - public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, - String action, FaultData fault) { + public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) { if (__log.isDebugEnabled()) __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action); markused(); @@ -132,27 +144,33 @@ /** * Entry point for message exchanges aimed at the my role. + * * @param mex */ void invokeProcess(MyRoleMessageExchangeImpl mex) { - PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName()); - if (target == null) { - String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId()); - __log.error(errmsg); - mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null); - return; - } + _hydrationLatch.latch(1); + try { + PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName()); + if (target == null) { + String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId()); + __log.error(errmsg); + mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null); + return; + } - mex.getDAO().setProcess(getProcessDAO()); - - if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { - __log.debug("Aborting processing of mex " + mex + " due to interceptors."); - return; - } + mex.getDAO().setProcess(getProcessDAO()); - markused(); - target.invokeMyRole(mex); - markused(); + if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { + __log.debug("Aborting processing of mex " + mex + " due to interceptors."); + return; + } + + markused(); + target.invokeMyRole(mex); + markused(); + } finally { + _hydrationLatch.release(1); + } // For a one way, once the engine is done, the mex can be safely released. if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) { @@ -178,16 +196,19 @@ if (target != null) { mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName())); } else { - __log.warn("Couldn't find endpoint from service " + mex.getServiceName() - + " when initializing a myRole mex."); + __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex."); } } /** * Extract the value of a BPEL property from a BPEL messsage variable. - * @param msgData message variable data - * @param alias alias to apply - * @param target description of the data (for error logging only) + * + * @param msgData + * message variable data + * @param alias + * alias to apply + * @param target + * description of the data (for error logging only) * @return value of the property * @throws FaultException */ @@ -230,16 +251,15 @@ } /** - * Get the element name for a given WSDL part. If the part is an - * element part, the name of that element is returned. If the - * part is an XML schema typed part, then the name of the part is returned - * in the null namespace. - * @param part WSDL {@link javax.wsdl.Part} + * Get the element name for a given WSDL part. If the part is an element part, the name of that element is returned. + * If the part is an XML schema typed part, then the name of the part is returned in the null namespace. + * + * @param part + * WSDL {@link javax.wsdl.Part} * @return name of element containing said part */ static QName getElementNameForPart(OMessageVarType.Part part) { - return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType - : new QName(null, part.name); + return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name); } /** @@ -247,12 +267,10 @@ * * @param mex * message exchange - * @return true if execution should continue, - * false otherwise + * @return true if execution should continue, false otherwise */ boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) { - InterceptorContextImpl ictx = new InterceptorContextImpl(getEngine()._contexts.dao.getConnection(), - getProcessDAO(), _pconf); + InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf); for (MessageExchangeInterceptor i : _mexInterceptors) if (!mex.processInterceptor(i, mex, ictx, invoker)) @@ -264,64 +282,68 @@ return true; } - /** * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map) */ public void handleWorkEvent(Map jobData) { - markused(); - - if (__log.isDebugEnabled()) { - __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobData", jobData })); - } - - WorkEvent we = new WorkEvent(jobData); + _hydrationLatch.latch(1); + try { + markused(); - // Process level events - if (we.getType().equals(WorkEvent.Type.INVOKE_INTERNAL)) { if (__log.isDebugEnabled()) { - __log.debug("InvokeInternal event for mexid " + we.getMexId()); - } - MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) getEngine().getMessageExchange(we.getMexId()); - if (mex == null) throw new ContextException("Unable to find MEX " + we.getMexId()); - invokeProcess(mex); - } else { - // Instance level events - ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID()); - if (procInstance == null) { - if (__log.isDebugEnabled()) { - __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring."); - } - return; + __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobData", jobData })); } - BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null); - switch (we.getType()) { - case TIMER: - if (__log.isDebugEnabled()) { - __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance); - } - processInstance.timerEvent(we.getChannel()); - break; - case RESUME: + + WorkEvent we = new WorkEvent(jobData); + + // Process level events + if (we.getType().equals(WorkEvent.Type.INVOKE_INTERNAL)) { if (__log.isDebugEnabled()) { - __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID()); + __log.debug("InvokeInternal event for mexid " + we.getMexId()); } - processInstance.execute(); - break; - case INVOKE_RESPONSE: - if (__log.isDebugEnabled()) { - __log.debug("InvokeResponse event for iid " + we.getIID()); + MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId()); + invokeProcess(mex); + } else { + // Instance level events + ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID()); + if (procInstance == null) { + if (__log.isDebugEnabled()) { + __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring."); + } + return; } - processInstance.invocationResponse(we.getMexId(), we.getChannel()); - processInstance.execute(); - break; - case MATCHER: - if (__log.isDebugEnabled()) { - __log.debug("Matcher event for iid " + we.getIID()); + + BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null); + switch (we.getType()) { + case TIMER: + if (__log.isDebugEnabled()) { + __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance); + } + processInstance.timerEvent(we.getChannel()); + break; + case RESUME: + if (__log.isDebugEnabled()) { + __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID()); + } + processInstance.execute(); + break; + case INVOKE_RESPONSE: + if (__log.isDebugEnabled()) { + __log.debug("InvokeResponse event for iid " + we.getIID()); + } + processInstance.invocationResponse(we.getMexId(), we.getChannel()); + processInstance.execute(); + break; + case MATCHER: + if (__log.isDebugEnabled()) { + __log.debug("Matcher event for iid " + we.getIID()); + } + processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey()); } - processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey()); } + } finally { + _hydrationLatch.release(1); } } @@ -335,8 +357,8 @@ for (Map.Entry provide : _pconf.getProvideEndpoints().entrySet()) { OPartnerLink plink = oprocess.getPartnerLink(provide.getKey()); if (plink == null) { - String errmsg = "Error in deployment descriptor for process " + _pid - + "; reference to unknown partner link " + provide.getKey(); + String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link " + + provide.getKey(); __log.error(errmsg); throw new BpelEngineException(errmsg); } @@ -347,13 +369,13 @@ for (Map.Entry invoke : _pconf.getInvokeEndpoints().entrySet()) { OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey()); if (plink == null) { - String errmsg = "Error in deployment descriptor for process " + _pid - + "; reference to unknown partner link " + invoke.getKey(); + String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link " + + invoke.getKey(); __log.error(errmsg); throw new BpelEngineException(errmsg); } - __log.debug("Processing element for process " + _pid + ": partnerlink " + invoke.getKey() - + " --> " + invoke.getValue()); + __log.debug("Processing element for process " + _pid + ": partnerlink " + invoke.getKey() + " --> " + + invoke.getValue()); } for (OPartnerLink pl : oprocess.getAllPartnerLinks()) { @@ -367,16 +389,16 @@ } if (pl.hasPartnerRole()) { - PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl( - this, pl, _pconf.getInvokeEndpoints().get(pl.getName())); + PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(this, pl, _pconf.getInvokeEndpoints().get( + pl.getName())); _partnerRoles.put(pl, partnerRole); } } } ProcessDAO getProcessDAO() { - return _pconf.isTransient() ? getEngine()._contexts.inMemDao.getConnection().getProcess(_pid) - : getEngine()._contexts.dao.getConnection().getProcess(_pid); + return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : getEngine()._contexts.dao + .getConnection().getProcess(_pid); } static String genCorrelatorId(OPartnerLink plink, String opName) { @@ -385,7 +407,7 @@ /** * De-serialize the compiled process representation from a stream. - * + * * @param is * input stream * @return process information from configuration database @@ -417,8 +439,7 @@ __log.debug("Activating " + _pid); // Activate all the my-role endpoints. for (Map.Entry entry : _pconf.getProvideEndpoints().entrySet()) { - EndpointReference initialEPR = getEngine()._contexts - .bindingContext.activateMyRoleEndpoint(_pid, entry.getValue()); + EndpointReference initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, entry.getValue()); __log.debug("Activated " + _pid + " myrole " + entry.getKey() + ": EPR is " + initialEPR); _myEprs.put(entry.getValue(), initialEPR); } @@ -430,30 +451,45 @@ void deactivate() { // Deactivate all the my-role endpoints. for (Endpoint endpoint : _myEprs.keySet()) - getEngine()._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint); + _engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint); - // TODO Deactivate all the partner-role channels + // TODO Deactivate all the partner-role channels } EndpointReference getInitialPartnerRoleEPR(OPartnerLink link) { - PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(link); - if (prole == null) - throw new IllegalStateException("Unknown partner link " + link); - return prole.getInitialEPR(); + _hydrationLatch.latch(1); + try { + PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link); + if (prole == null) + throw new IllegalStateException("Unknown partner link " + link); + return prole.getInitialEPR(); + } finally { + _hydrationLatch.release(1); + } } Endpoint getInitialPartnerRoleEndpoint(OPartnerLink link) { - PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(link); - if (prole == null) - throw new IllegalStateException("Unknown partner link " + link); - return prole._initialPartner; + _hydrationLatch.latch(1); + try { + PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link); + if (prole == null) + throw new IllegalStateException("Unknown partner link " + link); + return prole._initialPartner; + } finally { + _hydrationLatch.release(1); + } } EndpointReference getInitialMyRoleEPR(OPartnerLink link) { - PartnerLinkMyRoleImpl myRole = getMyRoles().get(link); - if (myRole == null) - throw new IllegalStateException("Unknown partner link " + link); - return myRole.getInitialEPR(); + _hydrationLatch.latch(1); + try { + PartnerLinkMyRoleImpl myRole = _myRoles.get(link); + if (myRole == null) + throw new IllegalStateException("Unknown partner link " + link); + return myRole.getInitialEPR(); + } finally { + _hydrationLatch.release(1); + } } QName getPID() { @@ -461,10 +497,15 @@ } PartnerRoleChannel getPartnerRoleChannel(OPartnerLink partnerLink) { - PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(partnerLink); - if (prole == null) - throw new IllegalStateException("Unknown partner link " + partnerLink); - return prole._channel; + _hydrationLatch.latch(1); + try { + PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink); + if (prole == null) + throw new IllegalStateException("Unknown partner link " + partnerLink); + return prole._channel; + } finally { + _hydrationLatch.release(1); + } } public void saveEvent(ProcessInstanceEvent event, ProcessInstanceDAO instanceDao) { @@ -476,8 +517,10 @@ boolean enabled = _pconf.isEventEnabled(scopeNames, event.getType()); if (enabled) { - if (instanceDao != null) saveInstanceEvent(event, instanceDao); - else __log.debug("Couldn't find instance to save event, no event generated!"); + if (instanceDao != null) + saveInstanceEvent(event, instanceDao); + else + __log.debug("Couldn't find instance to save event, no event generated!"); } } @@ -485,98 +528,46 @@ instanceDao.insertBpelEvent(event); } + /** + * Ask the process to dehydrate. + */ void dehydrate() { - _oprocess = null; - _partnerRoles = null; - _myRoles = null; - _endpointToMyRoleMap = null; - _replacementMap = null; - _expLangRuntimeRegistry = null; - } + _hydrationLatch.latch(0); - void hydrate() { - markused(); - __log.debug("Rehydrating process " + _pconf.getProcessId()); try { - _oprocess = deserializeCompiledProcess(_pconf.getCBPInputStream()); - } catch (Exception e) { - String errmsg = "Error reloading compiled process " + _pid + "; the file appears to be corrupted."; - __log.error(errmsg); - throw new BpelEngineException(errmsg, e); - } - - _replacementMap = new ReplacementMapImpl(_oprocess); - - // Create an expression language registry for this process - ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry(); - for (OExpressionLanguage elang : _oprocess.expressionLanguages) { - try { - elangRegistry.registerRuntime(elang); - } catch (ConfigurationException e) { - String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties); - __log.error(msg, e); - throw new BpelEngineException(msg, e); - } + // We don't actually need to do anything, the latch will run the doDehydrate method + // when necessary.. + } finally { + _hydrationLatch.release(0); } - _expLangRuntimeRegistry = elangRegistry; - - setRoles(getOProcess()); - if (!_hydratedOnce) { - for (PartnerLinkPartnerRoleImpl prole : getPartnerRoles().values()) { - PartnerRoleChannel channel = getEngine()._contexts.bindingContext.createPartnerRoleChannel(_pid, - prole._plinkDef.partnerRolePortType, prole._initialPartner); - prole._channel = channel; - _partnerChannels.put(prole._initialPartner, prole._channel); - EndpointReference epr = channel.getInitialEndpointReference(); - if (epr != null) { - prole._initialEPR = epr; - _partnerEprs.put(prole._initialPartner, epr); - } - __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " - + prole._initialEPR); - } - _hydratedOnce = true; - } - - for (PartnerLinkMyRoleImpl myrole : getMyRoles().values()) { - myrole._initialEPR = _myEprs.get(myrole._endpoint); - } - - for (PartnerLinkPartnerRoleImpl prole : getPartnerRoles().values()) { - prole._channel = _partnerChannels.get(prole._initialPartner); - if (_partnerEprs.get(prole._initialPartner) != null) { - prole._initialEPR = _partnerEprs.get(prole._initialPartner); - } - } - - _lifeCallback.hydrated(this); } OProcess getOProcess() { - if (_oprocess == null) hydrate(); - return _oprocess; - } - - public Map getMyRoles() { - if (_myRoles == null) hydrate(); - return _myRoles; - } - - public Map getPartnerRoles() { - if (_partnerRoles == null) hydrate(); - return _partnerRoles; + _hydrationLatch.latch(1); + try { + return _oprocess; + } finally { + _hydrationLatch.release(1); + } } private Map getEndpointToMyRoleMap() { - if (_endpointToMyRoleMap == null) hydrate(); - return _endpointToMyRoleMap; + _hydrationLatch.latch(1); + try { + return _endpointToMyRoleMap; + } finally { + _hydrationLatch.release(1); + } } public ReplacementMap getReplacementMap() { - if (_replacementMap == null) hydrate(); - assert _replacementMap != null; - return _replacementMap; + _hydrationLatch.latch(1); + try { + return _replacementMap; + } finally { + _hydrationLatch.release(1); + } } BpelEngineImpl getEngine() { @@ -591,6 +582,13 @@ return _lastUsed; } + /** + * Get a hint as to whether this process is hydrated. Note this is only a hint, since things could change. + */ + public boolean hintIsHydrated() { + return _oprocess != null; + } + /** Keep track of the time the process was last used. */ private final void markused() { _lastUsed = System.currentTimeMillis(); @@ -600,5 +598,155 @@ BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template, MyRoleMessageExchangeImpl instantiatingMessageExchange) { return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange); + + } + + /** + * If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already + * exists and matches the GUID. + */ + private void bounceProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) { + __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")"); + try { + boolean create = true; + ProcessDAO old = conn.getProcess(pid); + if (old != null) { + __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid()); + if (oprocess.guid == null) { + // No guid, old version assume its good + create = false; + } else { + if (old.getGuid().equals(oprocess.guid)) { + // Guids match, no need to create + create = false; + } else { + // GUIDS dont match, delete and create new + String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " + oprocess.guid + "; replacing."; + __log.debug(errmsg); + old.delete(); + } + } + } + + if (create) { + ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, (int) version); + for (String correlator : oprocess.getCorrelators()) { + newDao.addCorrelator(correlator); + } + } + } catch (BpelEngineException ex) { + throw ex; + } catch (Exception dce) { + __log.error("DbError", dce); + throw new BpelEngineException("DbError", dce); + } + } + + private class HydrationLatch extends NStateLatch { + + HydrationLatch() { + super(new Runnable[2]); + _transitions[0] = new Runnable() { + public void run() { + doDehydrate(); + } + }; + + _transitions[1] = new Runnable() { + public void run() { + doHydrate(); + } + }; + + } + + private void doDehydrate() { + _oprocess = null; + _partnerRoles = null; + _myRoles = null; + _endpointToMyRoleMap = null; + _replacementMap = null; + _expLangRuntimeRegistry = null; + } + + private void doHydrate() { + markused(); + __log.debug("Rehydrating process " + _pconf.getProcessId()); + try { + _oprocess = deserializeCompiledProcess(_pconf.getCBPInputStream()); + } catch (Exception e) { + String errmsg = "Error reloading compiled process " + _pid + "; the file appears to be corrupted."; + __log.error(errmsg); + throw new BpelEngineException(errmsg, e); + } + + _replacementMap = new ReplacementMapImpl(_oprocess); + + // Create an expression language registry for this process + ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry(); + for (OExpressionLanguage elang : _oprocess.expressionLanguages) { + try { + elangRegistry.registerRuntime(elang); + } catch (ConfigurationException e) { + String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties); + __log.error(msg, e); + throw new BpelEngineException(msg, e); + } + } + _expLangRuntimeRegistry = elangRegistry; + + setRoles(_oprocess); + + if (!_hydratedOnce) { + for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) { + PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid, + prole._plinkDef.partnerRolePortType, prole._initialPartner); + prole._channel = channel; + _partnerChannels.put(prole._initialPartner, prole._channel); + EndpointReference epr = channel.getInitialEndpointReference(); + if (epr != null) { + prole._initialEPR = epr; + _partnerEprs.put(prole._initialPartner, epr); + } + __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + + prole._initialEPR); + } + _hydratedOnce = true; + } + + for (PartnerLinkMyRoleImpl myrole : _myRoles.values()) { + myrole._initialEPR = _myEprs.get(myrole._endpoint); + } + + for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) { + prole._channel = _partnerChannels.get(prole._initialPartner); + if (_partnerEprs.get(prole._initialPartner) != null) { + prole._initialEPR = _partnerEprs.get(prole._initialPartner); + } + } + + + if (isInMemory()) { + bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess); + } else if (_engine._contexts.scheduler.isTransacted()) { + // If we have a transaction, we do this in the current transaction. + bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); + } else { + // If we do not have a transaction we need to create one. + try { + _engine._contexts.scheduler.execIsolatedTransaction(new Callable() { + public Object call() throws Exception { + bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); + return null; + } + }); + } catch (Exception ex) { + String errmsg = "DbError"; + __log.error(errmsg, ex); + throw new BpelEngineException(errmsg, ex); + } + } + } + } }