ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r.@apache.org
Subject svn commit: r789611 - in /ode/trunk: bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-store/src/main/java/org/apache/ode/store/ engine/src/main/java/org/apache/ode/bpel/engine/ engine/src/main/java/org/apache/ode/bpel/engine/cron/ scheduler-simple...
Date Tue, 30 Jun 2009 08:28:31 GMT
Author: rr
Date: Tue Jun 30 08:28:30 2009
New Revision: 789611

URL: http://svn.apache.org/viewvc?rev=789611&view=rev
Log:
ODE-626: Addressed suggestions #1, #3, #4 from Alexis

Removed:
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
Modified:
    ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
    ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java

Modified: ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java (original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java Tue Jun 30 08:28:30
2009
@@ -97,30 +97,7 @@
         INVOKE_CHECK
     }
     
-    public interface JobDetails {
-        public Long getInstanceId();
-        public void setInstanceId(Long iid);
-        public String getMexId();
-        public void setMexId(String mexId);
-        public QName getProcessId();
-        public void setProcessId(QName processId);
-        public JobType getType();
-        public void setType(JobType type);
-        public String getChannel();
-        public void setChannel(String channel);
-        public String getCorrelatorId();
-        public void setCorrelatorId(String correlatorId);
-        public CorrelationKey getCorrelationKey();
-        public void setCorrelationKey(CorrelationKey correlationKey);
-        public Integer getRetryCount();
-        public void setRetryCount(Integer retryCount);
-        public Boolean getInMem();
-        public void setInMem(Boolean inMem);
-        public Map<String, Object> getDetailsExt();
-        public void setDetailsExt(Map<String, Object> detailsExt);
-    }
-
-    public static class JobDetailsImpl implements Scheduler.JobDetails {
+    public static class JobDetails {
         public Long instanceId;
         public String mexId;
         public String processId;
@@ -195,7 +172,7 @@
         
         @Override
         public String toString() {
-            return "JobDetailsImpl("
+            return "JobDetails("
             + " instanceId: " + instanceId
             + " mexId: " + mexId
             + " processId: " + processId

Modified: ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java (original)
+++ ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java Tue Jun 30
08:28:30 2009
@@ -50,7 +50,6 @@
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetailsImpl;
 import org.apache.ode.bpel.rapi.ProcessModel;
 import org.apache.ode.store.DeploymentUnitDir.CBPInfo;
 import org.apache.ode.utils.CronExpression;
@@ -468,7 +467,7 @@
                     cleanupInfo.setFilters(aCleanup.getFilterList());
                     ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), aCleanup.getCategoryList());
                     
-                    JobDetails runnableDetails = new JobDetailsImpl();
+                    JobDetails runnableDetails = new JobDetails();
                     runnableDetails.getDetailsExt().put("cleanupInfo", cleanupInfo);
                     runnableDetails.getDetailsExt().put("pid", _pid);
                     runnableDetails.getDetailsExt().put("transactionSize", 10);

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Tue Jun 30 08:28:30 2009
@@ -581,22 +581,22 @@
     }
 
     public void registerTimer(String timerChannelId, Date timeToFire) {
-        WorkEvent we = new WorkEvent();
-        we.setInstanceId(_dao.getInstanceId());
-        we.setProcessId(_bpelProcess.getPID());
-        we.setChannel(timerChannelId);
-        we.setType(Scheduler.JobType.TIMER);
-        _bpelProcess.scheduleWorkEvent(we, timeToFire);
+        JobDetails j = new JobDetails();
+        j.setInstanceId(_dao.getInstanceId());
+        j.setProcessId(_bpelProcess.getPID());
+        j.setChannel(timerChannelId);
+        j.setType(Scheduler.JobType.TIMER);
+        _bpelProcess.scheduleJob(j, timeToFire);
     }
 
     private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) {
-        WorkEvent we = new WorkEvent();
-        we.setInstanceId(_dao.getInstanceId());
-        we.setProcessId(_bpelProcess.getPID());
-        we.setType(Scheduler.JobType.MATCHER);
-        we.setCorrelatorId(correlatorId);
-        we.setCorrelationKey(key);
-        _bpelProcess.scheduleWorkEvent(we, null);
+        JobDetails j = new JobDetails();
+        j.setInstanceId(_dao.getInstanceId());
+        j.setProcessId(_bpelProcess.getPID());
+        j.setType(Scheduler.JobType.MATCHER);
+        j.setCorrelatorId(correlatorId);
+        j.setCorrelationKey(key);
+        _bpelProcess.scheduleJob(j, null);
     }
 
     public String invoke(String requestId, PartnerLink partnerLink, Operation operation,
Element outgoingMessage)
@@ -774,12 +774,12 @@
                     __log.debug("MaxTime exceeded for instance # " + _iid);
 
                 try {
-                    WorkEvent we = new WorkEvent();
-                    we.setInstanceId(_iid);
-                    we.setRetryCount(_retryCount);
-                    we.setProcessId(_bpelProcess.getPID());
-                    we.setType(Scheduler.JobType.RESUME);
-                    _contexts.scheduler.schedulePersistedJob(we.getDetails(), new Date());
+                    JobDetails j = new JobDetails();
+                    j.setInstanceId(_iid);
+                    j.setRetryCount(_retryCount);
+                    j.setProcessId(_bpelProcess.getPID());
+                    j.setType(Scheduler.JobType.RESUME);
+                    _contexts.scheduler.schedulePersistedJob(j, new Date());
                 } catch (ContextException e) {
                     __log.error("Failed to schedule resume task.", e);
                     throw new BpelEngineException(e);

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Jun
30 08:28:30 2009
@@ -46,6 +46,7 @@
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
 import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
@@ -522,8 +523,8 @@
     public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
         _mngmtLock.readLock().lock();
         try {
-            final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
-            ODEProcess process = _registeredProcesses.get(we.getProcessId());
+            final JobDetails j = jobInfo.jobDetail;
+            ODEProcess process = _registeredProcesses.get(j.getProcessId());
             if (process == null) {
                 // If the process is not active, it means that we should not be
                 // doing any work on its behalf, therefore we will reschedule the
@@ -532,8 +533,8 @@
                     public Void call() throws Exception {
                         _contexts.scheduler.jobCompleted(jobInfo.jobName);
                         Date future = new Date(System.currentTimeMillis() + (60 * 1000));
-                        __log.debug(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(),
jobInfo.jobName, future));
-                        _contexts.scheduler.schedulePersistedJob(we.getDetails(), future);
           
+                        __log.debug(__msgs.msgReschedulingJobForInactiveProcess(j.getProcessId(),
jobInfo.jobName, future));
+                        _contexts.scheduler.schedulePersistedJob(j, future);            
                         return null;
                     }
                     
@@ -541,12 +542,12 @@
                 return;
             }
             
-            if (we.getType().equals(Scheduler.JobType.INVOKE_CHECK)) {
-                if (__log.isDebugEnabled()) __log.debug("handleWorkEvent: InvokeCheck event
for mexid " + we.getMexId());
+            if (j.getType().equals(Scheduler.JobType.INVOKE_CHECK)) {
+                if (__log.isDebugEnabled()) __log.debug("handleWorkEvent: InvokeCheck event
for mexid " + j.getMexId());
 
-                PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) getMessageExchange(we.getMexId());
+                PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) getMessageExchange(j.getMexId());
                 if (mex.getStatus() == MessageExchange.Status.ASYNC || mex.getStatus() ==
MessageExchange.Status.ACK) {
-                    String msg = "No response received for invoke (mexId=" + we.getMexId()
+ "), forcing it into a failed state.";
+                    String msg = "No response received for invoke (mexId=" + j.getMexId()
+ "), forcing it into a failed state.";
                     if (__log.isDebugEnabled()) __log.debug(msg);
                     mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR,
msg, null);
                 }

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java Tue Jun
30 08:28:30 2009
@@ -172,11 +172,11 @@
                         __log.debug("step(" + iid + ") adding step indicator to table.");
                         _step.add(iid);
 
-                        WorkEvent we = new WorkEvent();
-                        we.setInstanceId(iid);
-                        we.setProcessId(_process.getPID());
-                        we.setType(Scheduler.JobType.RESUME);
-                        _process._contexts.scheduler.schedulePersistedJob(we.getDetails(),
null);
+                        JobDetails j = new JobDetails();
+                        j.setInstanceId(iid);
+                        j.setProcessId(_process.getPID());
+                        j.setType(Scheduler.JobType.RESUME);
+                        _process._contexts.scheduler.schedulePersistedJob(j, null);
 
                         return true;
                     }
@@ -296,11 +296,11 @@
                         _process.saveEvent(evt, instance);
                         onEvent(evt);
 
-                        WorkEvent we = new WorkEvent();
-                        we.setType(JobType.RESUME);
-                        we.setProcessId(_process.getPID());
-                        we.setInstanceId(iid);
-                        _process._contexts.scheduler.schedulePersistedJob(we.getDetails(),
null);
+                        JobDetails j = new JobDetails();
+                        j.setType(JobType.RESUME);
+                        j.setProcessId(_process.getPID());
+                        j.setInstanceId(iid);
+                        _process._contexts.scheduler.schedulePersistedJob(j, null);
 
                         return true;
                     }

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java Tue Jun 30 08:28:30
2009
@@ -42,6 +42,7 @@
 import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
 import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.bpel.intercept.FailMessageExchangeException;
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -350,15 +351,15 @@
 
         markused();
 
-        final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+        final JobDetails j = jobInfo.jobDetail;
         if (__log.isDebugEnabled()) {
             __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[]
{ "jobInfo", jobInfo }));
         }
 
-        enqueueInstanceTransaction(we.getInstanceId(), new Runnable() {
+        enqueueInstanceTransaction(j.getInstanceId(), new Runnable() {
             public void run() {
                 _contexts.scheduler.jobCompleted(jobInfo.jobName);
-                execInstanceEvent(we);
+                execInstanceEvent(j);
             }
         });
 
@@ -401,41 +402,41 @@
         return state;
     }
 
-    private void execInstanceEvent(WorkEvent we) {
-        BpelInstanceWorker worker = _instanceWorkerCache.get(we.getInstanceId());
+    private void execInstanceEvent(JobDetails j) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(j.getInstanceId());
         assert worker.isWorkerThread();
 
-        ProcessInstanceDAO instanceDAO = getProcessDAO().getInstance(we.getInstanceId());
-        MessageExchangeDAO mexDao = we.getMexId() == null ? null : loadMexDao(we.getMexId());
+        ProcessInstanceDAO instanceDAO = getProcessDAO().getInstance(j.getInstanceId());
+        MessageExchangeDAO mexDao = j.getMexId() == null ? null : loadMexDao(j.getMexId());
 
         if (instanceDAO == null) {
             if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getInstanceId()
+ "; ignoring.");
+                __log.debug("handleWorkEvent: no ProcessInstance found with iid " + j.getInstanceId()
+ "; ignoring.");
             }
             return;
         }
 
         if (__log.isDebugEnabled()) {
-            __log.debug("handleWorkEvent: " + we.getType() + " event for process instance
" + we.getInstanceId());
+            __log.debug("handleWorkEvent: " + j.getType() + " event for process instance
" + j.getInstanceId());
         }
 
-        switch (we.getType()) {
+        switch (j.getType()) {
         case MYROLE_INVOKE:
             executeContinueInstanceMyRoleRequestReceived(mexDao);
             if(__log.isDebugEnabled()) __log.debug("handleWorkEvent: releasing myrole mex
dao: " + mexDao);
             mexDao.release(true);
             break;
         case TIMER:
-            executeContinueInstanceTimerReceived(instanceDAO, we.getChannel());
+            executeContinueInstanceTimerReceived(instanceDAO, j.getChannel());
             break;
         case RESUME:
-            executeContinueInstanceResume(instanceDAO, we.getRetryCount());
+            executeContinueInstanceResume(instanceDAO, j.getRetryCount());
             break;
         case PARTNER_RESPONSE:
             executeContinueInstancePartnerRoleResponseReceived(mexDao);
             break;
         case MATCHER:
-            executeContinueInstanceMatcherEvent(instanceDAO, we.getCorrelatorId(), we.getCorrelationKey());
+            executeContinueInstanceMatcherEvent(instanceDAO, j.getCorrelatorId(), j.getCorrelationKey());
             break;
         }
     }
@@ -761,11 +762,11 @@
         }
     }
 
-    public String scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+    public String scheduleJob(JobDetails jd, Date timeToFire) {
         // if (isInMemory())
         // throw new InvalidProcessException("In-mem process execution resulted in event
scheduling.");
 
-        return _contexts.scheduler.schedulePersistedJob(we.getDetails(), timeToFire);
+        return _contexts.scheduler.schedulePersistedJob(jd, timeToFire);
     }
 
     protected OdeRuntime buildRuntime(int modelVersion) {

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java Tue Jun
30 08:28:30 2009
@@ -1,6 +1,7 @@
 package org.apache.ode.bpel.engine;
 
 import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.bpel.rapi.ResourceModel;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -135,14 +136,14 @@
                         }
                     });
                 } else /* non-transacted style */ {
-                    WorkEvent we = new WorkEvent();
-                    we.setType(Scheduler.JobType.MYROLE_INVOKE);
-                    we.setInstanceId(mexdao.getInstance().getInstanceId());
-                    we.setMexId(mexdao.getMessageExchangeId());
+                    JobDetails j = new JobDetails();
+                    j.setType(Scheduler.JobType.MYROLE_INVOKE);
+                    j.setInstanceId(mexdao.getInstance().getInstanceId());
+                    j.setMexId(mexdao.getMessageExchangeId());
                     // Could be different to this pid when routing to an older version
-                    we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+                    j.setProcessId(mexdao.getInstance().getProcess().getProcessId());
 
-                    scheduleWorkEvent(we, null);
+                    scheduleJob(j, null);
                 }
                 // Cleaning up
                 _contexts.dao.getConnection().deleteResourceRoute(urlMeth[0], urlMeth[1]);

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java Tue Jun 30
08:28:30 2009
@@ -5,6 +5,7 @@
 import org.apache.ode.bpel.rapi.ConstantsModel;
 import org.apache.ode.bpel.rapi.InvalidProcessException;
 import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.bpel.iapi.Scheduler.JobType;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.MessageDAO;
@@ -408,14 +409,14 @@
                 } else if (istyle == InvocationStyle.P2P_TRANSACTED) /* transact p2p invoke
in the same thread */ {
                     executeContinueInstanceMyRoleRequestReceived(mexdao);
                 } else /* non-transacted style */{
-                    WorkEvent we = new WorkEvent();
-                    we.setType(JobType.MYROLE_INVOKE);
-                    we.setInstanceId(mexdao.getInstance().getInstanceId());
-                    we.setMexId(mexdao.getMessageExchangeId());
+                    JobDetails j = new JobDetails();
+                    j.setType(JobType.MYROLE_INVOKE);
+                    j.setInstanceId(mexdao.getInstance().getInstanceId());
+                    j.setMexId(mexdao.getMessageExchangeId());
                     // Could be different to this pid when routing to an older version
-                    we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+                    j.setProcessId(mexdao.getInstance().getProcess().getProcessId());
 
-                    scheduleWorkEvent(we, null);
+                    scheduleJob(j, null);
                 }
             } else if (cstatus == MyRoleMessageExchange.CorrelationStatus.QUEUED) {
                 ; // do nothing
@@ -671,15 +672,15 @@
                 org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
         if (!isInMemory() && isTwoWay) {
             if (__log.isDebugEnabled()) __log.debug("Creating invocation check event for
mexid " + mex.getMessageExchangeId());
-            WorkEvent event = new WorkEvent();
-            event.setMexId(mex.getMessageExchangeId());
-            event.setProcessId(getPID());
-            event.setType(JobType.INVOKE_CHECK);
+            JobDetails job = new JobDetails();
+            job.setMexId(mex.getMessageExchangeId());
+            job.setProcessId(getPID());
+            job.setType(JobType.INVOKE_CHECK);
             // use a greater timeout to make sure the check job does not get executed while
the service invocation is still waiting for a response
             PartnerLinkModel model = _processModel.getPartnerLink(mex.getPartnerLinkModelId());
             long timeout = (long) (getTimeout(model)*1.5);
             Date future = new Date(System.currentTimeMillis() + timeout);
-            String jobId = scheduleWorkEvent(event, future);
+            String jobId = scheduleJob(job, future);
             mex.setProperty("invokeCheckJobId", jobId);
         }
     }

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
(original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
Tue Jun 30 08:28:30 2009
@@ -7,6 +7,7 @@
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.bpel.iapi.Scheduler.JobType;
 import org.apache.ode.bpel.rapi.PartnerLinkModel;
 
@@ -48,9 +49,9 @@
         assert !_process.isInMemory() : "resumeInstance() for reliable in-mem processes makes
no sense.";
 
         MessageExchangeDAO mexdao = getDAO();
-        final WorkEvent we = generatePartnerResponseWorkEvent(mexdao);
+        final JobDetails j = generatePartnerResponseJob(mexdao);
         save(mexdao);
-        _contexts.scheduler.schedulePersistedJob(we.getDetails(), null);
+        _contexts.scheduler.schedulePersistedJob(j, null);
     }
 
 
@@ -59,14 +60,14 @@
         return InvocationStyle.RELIABLE;
     }
 
-    private WorkEvent generatePartnerResponseWorkEvent(MessageExchangeDAO mexdao) {
-        WorkEvent we = new WorkEvent();
-        we.setProcessId(_process.getPID());
-        we.setChannel(mexdao.getChannel());
-        we.setInstanceId(_iid);
-        we.setMexId(_mexId);
-        we.setType(JobType.PARTNER_RESPONSE);
-        return we;
+    private JobDetails generatePartnerResponseJob(MessageExchangeDAO mexdao) {
+        JobDetails j = new JobDetails();
+        j.setProcessId(_process.getPID());
+        j.setChannel(mexdao.getChannel());
+        j.setInstanceId(_iid);
+        j.setMexId(_mexId);
+        j.setType(JobType.PARTNER_RESPONSE);
+        return j;
     }
 
 }

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
(original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
Tue Jun 30 08:28:30 2009
@@ -16,7 +16,6 @@
 import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
 import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
 import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetailsImpl;
 import org.apache.ode.store.ProcessCleanupConfImpl;
 import org.apache.ode.utils.CronExpression;
 import org.apache.xmlbeans.XmlOptions;
@@ -66,7 +65,7 @@
                         cleanupInfo.setFilters(aCleanup.getFilterList());
                         ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(),
aCleanup.getCategoryList());
                         
-                        JobDetails runnableDetails = new JobDetailsImpl();
+                        JobDetails runnableDetails = new JobDetails();
                         
                         runnableDetails.getDetailsExt().put("cleanupInfo", cleanupInfo);
                         runnableDetails.getDetailsExt().put("transactionSize", 10);

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
(original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
Tue Jun 30 08:28:30 2009
@@ -34,9 +34,10 @@
 import java.util.Map;
 
 import javax.sql.DataSource;
+import javax.xml.namespace.QName;
 
 import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetailsImpl;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.utils.DbIsolation;                                                
                                                                                
 
 import org.apache.commons.logging.Log;
@@ -188,7 +189,7 @@
             ps.setInt(i++, asInteger(loaded));
             ps.setInt(i++, asInteger(job.transacted));
             
-            JobDetailsImpl details = (JobDetailsImpl) job.detail;
+            JobDetails details = job.detail;
             ps.setObject(i++, details.instanceId, Types.BIGINT);
             ps.setObject(i++, details.mexId, Types.VARCHAR);
             ps.setObject(i++, details.processId, Types.VARCHAR);
@@ -235,7 +236,7 @@
             
             ResultSet rs = ps.executeQuery();
             while (rs.next()) {
-                Scheduler.JobDetailsImpl details = new Scheduler.JobDetailsImpl();
+                Scheduler.JobDetails details = new Scheduler.JobDetails();
                 details.instanceId = (Long) rs.getObject("instanceId");
                 details.mexId = (String) rs.getObject("mexId");
                 details.processId = (String) rs.getObject("processId");
@@ -255,6 +256,41 @@
                     }
                 }
                 
+                {
+                    //For compatibility reasons, we check whether there are entries inside
+                    //jobDetailsExt blob, which correspond to extracted entries. If so, we
+                    //use them.
+
+                    Map<String, Object> detailsExt = details.getDetailsExt();
+                    if (detailsExt.get("type") != null) {
+                        details.type = (String) detailsExt.get("type");
+                    }
+                    if (detailsExt.get("iid") != null) {
+                        details.instanceId = (Long) detailsExt.get("iid");
+                    }
+                    if (detailsExt.get("pid") != null) {
+                        details.processId = (String) detailsExt.get("pid");
+                    }
+                    if (detailsExt.get("inmem") != null) {
+                        details.inMem = (Boolean) detailsExt.get("inmem");
+                    }
+                    if (detailsExt.get("ckey") != null) {
+                        details.correlationKey = (String) detailsExt.get("ckey");
+                    }
+                    if (detailsExt.get("channel") != null) {
+                        details.channel = (String) detailsExt.get("channel");
+                    }
+                    if (detailsExt.get("mexid") != null) {
+                        details.mexId = (String) detailsExt.get("mexid");
+                    }
+                    if (detailsExt.get("correlatorId") != null) {
+                        details.correlatorId = (String) detailsExt.get("correlatorId");
+                    }
+                    if (detailsExt.get("retryCount") != null) {
+                        details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount"));
+                    }
+                }
+                
                 Job job = new Job(rs.getLong(2), rs.getString(1), asBoolean(rs.getInt(3)),
details);
                 ret.add(job);
             }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
(original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Tue Jun 30 08:28:30 2009
@@ -192,7 +192,7 @@
         if (when == null)
             when = new Date(ctime);
 
-        JobDetails jobDetails = new JobDetailsImpl();
+        JobDetails jobDetails = new JobDetails();
         jobDetails.getDetailsExt().put("runnable", runnable);
         runnable.storeToDetails(jobDetails);
         

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
(original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
Tue Jun 30 08:28:30 2009
@@ -22,7 +22,10 @@
 import java.util.HashMap;
 import java.util.List;
 
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
 import org.apache.ode.scheduler.simple.DatabaseDelegate;
 import org.apache.ode.scheduler.simple.Job;
 
@@ -53,19 +56,19 @@
         assertEquals(0, nids.size());
         
         // try for one nodeid
-        _del.insertJob(new Job(0L,true,new Scheduler.JobDetailsImpl()), "abc", true);
+        _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc", true);
         nids = _del.getNodeIds();
         assertEquals(1, nids.size());
         assertTrue(nids.contains("abc"));
         
         // check that dups are ignored. 
-        _del.insertJob(new Job(0L,true,new Scheduler.JobDetailsImpl()), "abc", true);   

+        _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc", true);    
         nids = _del.getNodeIds();
         assertEquals(1, nids.size());
         assertTrue(nids.contains("abc"));
         
         // add another nodeid, 
-        _del.insertJob(new Job(0L,true,new Scheduler.JobDetailsImpl()), "123", true);   

+        _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "123", true);    
         nids = _del.getNodeIds();
         assertEquals(2, nids.size());
         assertTrue(nids.contains("abc"));        
@@ -73,8 +76,8 @@
     }
 
     public void testReassign() throws Exception {
-        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()), "n1", false);
-        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()), "n2", false);
+        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
+        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false);
         
         assertEquals(1,_del.updateReassign("n1","n2"));
         List<Job> jobs = _del.dequeueImmediate("n2", 400L, 1000);
@@ -82,8 +85,8 @@
     }
 
     public void testScheduleImmediateTimeFilter() throws Exception {
-        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()), "n1", false);
-        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()), "n1", false);
+        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
+        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n1", false);
 
 
         List<Job> jobs = _del.dequeueImmediate("n1", 150L, 1000);
@@ -97,8 +100,8 @@
     }
     
     public void testScheduleImmediateMaxRows() throws Exception {
-        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()), "n1", false);
-        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()), "n1", false);
+        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
+        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n1", false);
 
         List<Job> jobs = _del.dequeueImmediate("n1", 201L, 1);
         assertNotNull(jobs);
@@ -111,8 +114,8 @@
     }
 
     public void testScheduleImmediateNodeFilter() throws Exception {
-        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()), "n1", false);
-        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()), "n2", false);
+        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
+        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false);
 
         List<Job> jobs = _del.dequeueImmediate("n2", 300L, 1000);
         assertNotNull(jobs);
@@ -121,8 +124,8 @@
     }
 
     public void testDeleteJob() throws Exception {
-        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()), "n1", false);
-        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()), "n2", false);
+        _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()), "n1", false);
+        _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()), "n2", false);
         
         // try deleting, wrong jobid -- del should fail
         assertFalse(_del.deleteJob("j1x", "n1"));
@@ -139,7 +142,7 @@
     
     public void testUpgrade() throws Exception {
         for (int i = 0; i < 200; ++i)
-            _del.insertJob(new Job(i ,"j" +i,true,new Scheduler.JobDetailsImpl()), null,
false);
+            _del.insertJob(new Job(i ,"j" +i,true,new Scheduler.JobDetails()), null, false);
         
         int n1 = _del.updateAssignToNode("n1", 0, 3, 100);
         int n2 = _del.updateAssignToNode("n2", 1, 3, 100);
@@ -153,4 +156,30 @@
         assertEquals(n3,_del.dequeueImmediate("n3", 10000L, 1000).size());
     }
     
+    public void testMigration() throws Exception {
+        Scheduler.JobDetails j1 = new Scheduler.JobDetails();
+        j1.getDetailsExt().put("type", "MATCHER");
+        j1.getDetailsExt().put("iid", 1234L);
+        j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString());
+        j1.getDetailsExt().put("inmem", true);
+        j1.getDetailsExt().put("ckey", "123~abcd");
+        j1.getDetailsExt().put("channel", "123");
+        j1.getDetailsExt().put("mexid", "mexid123");
+        j1.getDetailsExt().put("correlatorId", "cid123");
+        j1.getDetailsExt().put("retryCount", "15");
+        
+        _del.insertJob(new Job(0 ,"migration",true,j1), null, false);
+        _del.updateAssignToNode("m", 0, 3, 100);
+        Scheduler.JobDetails j2 = _del.dequeueImmediate("m", 10000L, 1000).get(0).detail;
+        
+        assertEquals(j2.getType(), JobType.MATCHER);
+        assertEquals(j2.getInstanceId(), (Object) 1234L);
+        assertEquals(j2.getProcessId(), new QName("http://test1", "test2"));
+        assertEquals(j2.getInMem(), (Object) true);
+        assertEquals(j2.getCorrelationKey().toCanonicalString(), (Object) "123~abcd");
+        assertEquals(j2.getChannel(), (Object) "123");
+        assertEquals(j2.getMexId(), (Object) "mexid123");
+        assertEquals(j2.getCorrelatorId(), (Object) "cid123");
+        assertEquals(j2.getRetryCount(), (Object) 15);
+    }
 }

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
(original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
Tue Jun 30 08:28:30 2009
@@ -65,7 +65,7 @@
     }
 
     Scheduler.JobDetails newDetail(String x) {
-        Scheduler.JobDetails jd = new Scheduler.JobDetailsImpl();
+        Scheduler.JobDetails jd = new Scheduler.JobDetails();
         jd.getDetailsExt().put("foo", x);
         return jd;
     }

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
(original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Tue Jun 30 08:28:30 2009
@@ -207,7 +207,7 @@
     }
 
     Scheduler.JobDetails newDetail(String x) {
-        Scheduler.JobDetails jd = new Scheduler.JobDetailsImpl();
+        Scheduler.JobDetails jd = new Scheduler.JobDetails();
         jd.getDetailsExt().put("foo", x);
         return jd;
     }



Mime
View raw message