ode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mszef...@apache.org
Subject svn commit: r549534 - in /incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi: OdeConsumer.java OdeConsumerAsync.java OdeConsumerSync.java OdeContext.java OdeLifeCycle.java
Date Thu, 21 Jun 2007 16:33:51 GMT
Author: mszefler
Date: Thu Jun 21 09:33:49 2007
New Revision: 549534

URL: http://svn.apache.org/viewvc?view=rev&rev=549534
Log:
Added configurable JBI send/sendSync using system property org.apache.ode.jbi.sendSynch


Added:
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java   (with
props)
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java   (with
props)
Modified:
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=549534&r1=549533&r2=549534
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Thu Jun 21 09:33:49
2007
@@ -39,18 +39,18 @@
 
 /**
  * Bridge between ODE (consumers) and JBI (providers). An single object of this type handles
all communications initiated by ODE
- * that is destined for other JBI providers.
+ * that is destined for other JBI providers. 
  */
-class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
+abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
     private static final Log __log = LogFactory.getLog(OdeConsumer.class);
+    private static final long DEFAULT_RESPONSE_TIMEOUT = 2 * 60 * 1000L;
 
-    private static final long DEFAULT_SENDSYNC_TIMEOUT = 2 * 60 * 1000L;
+    protected OdeContext _ode;
 
-    private OdeContext _ode;
+    protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
 
-    private long _sendSyncTimeout = DEFAULT_SENDSYNC_TIMEOUT;
 
-    private Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String,
PartnerRoleMessageExchange>();
+    protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String,
PartnerRoleMessageExchange>();
 
     OdeConsumer(OdeContext ode) {
         _ode = ode;
@@ -103,23 +103,9 @@
                 _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
                     public void afterCompletion(boolean success) {
                         if (success) {
-                            _ode._executorService.submit(new Runnable() {
-                                public void run() {
-                                    try {
-                                        boolean sendOk = _ode.getChannel().sendSync(inonly,
_sendSyncTimeout);
-                                        if (!sendOk) {
-                                            __log.warn("Timeout while sending message for
JBI message exchange: " + jbiMex.getExchangeId());
-                                        }
-                                        onJbiMessageExchange(inonly);
-                                    } catch (MessagingException e) {
-                                        String errmsg = "Error sending request-only message
to JBI for ODE mex " + odeMex;
-                                        __log.error(errmsg, e);
-                                    }
-                                }
-                            });
+                            doSendOneWay(odeMex, inonly);
                         }
                     }
-
                     public void beforeCompletion() {
                     }
 
@@ -133,21 +119,7 @@
                 _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
                     public void afterCompletion(boolean success) {
                         if (success) {
-                            _ode._executorService.submit(new Runnable() {
-                                public void run() {
-                                    try {
-                                        _outstandingExchanges.put(inout.getExchangeId(),
odeMex);
-                                        boolean sendOk = _ode.getChannel().sendSync(inout,
_sendSyncTimeout);
-                                        if (!sendOk) {
-                                            __log.warn("Timeout while sending message for
JBI message exchange: " + jbiMex.getExchangeId());
-                                        }
-                                        onJbiMessageExchange(inout);
-                                    } catch (MessagingException e) {
-                                        String errmsg = "Error sending request-only message
to JBI for ODE mex " + odeMex;
-                                        __log.error(errmsg, e);
-                                    }
-                                }
-                            });
+                            doSendTwoWay(odeMex, inout);
                         }
                     }
 
@@ -170,6 +142,11 @@
 
     }
 
+    protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly);
+
+    protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout);
+
+
     public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {
         if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
             !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
@@ -273,11 +250,11 @@
         }
     }
 
-    public void setSendSyncTimeout(long timeout) {
-    	_sendSyncTimeout = timeout;
+    public void setResponseTimeout(long timeout) {
+    	_responseTimeout = timeout;
     }
 
-    public long getSendSyncTimeout() {
-    	return _sendSyncTimeout;
+    public long getResponseTimeout() {
+    	return _responseTimeout;
     }
 }

Added: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java?view=auto&rev=549534
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java (added)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java Thu Jun
21 09:33:49 2007
@@ -0,0 +1,100 @@
+package org.apache.ode.jbi;
+
+
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessagingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+
+/**
+ * 
+ * @author mszefler
+ *
+ */
+class OdeConsumerAsync extends OdeConsumer {
+    private static final Log __log = LogFactory.getLog(OdeConsumerAsync.class);
+
+    /** 
+     * We create an executor to handle all the asynchronous invocations/timeouts. Note, we
don't need a lot of threads
+     * here, the operations are all async, using single-thread executor avoids any possible
problems in concurrent
+     * use of delivery channel.
+     */
+    private ScheduledExecutorService _executor;
+
+    OdeConsumerAsync(OdeContext ode) {
+        super(ode);
+       _executor = Executors.newSingleThreadScheduledExecutor();
+
+    }
+
+    @Override
+    protected void doSendOneWay(final PartnerRoleMessageExchange odeMex, final InOnly inonly)
{
+        _executor.submit(new Runnable() {
+            public void run() {
+                try {
+                    _outstandingExchanges.put(inonly.getExchangeId(), odeMex);
+                    _ode.getChannel().send(inonly);
+                } catch (MessagingException e) {
+                    String errmsg = "Error sending request-only message to JBI for ODE mex
" + odeMex;
+                    __log.error(errmsg, e);
+                }
+            }
+        });
+
+    }
+
+    @Override
+    protected void doSendTwoWay(final PartnerRoleMessageExchange odeMex, final InOut inout)
{
+        _executor.submit(new Runnable() {
+            public void run() {
+                try {
+                    _outstandingExchanges.put(inout.getExchangeId(), odeMex);
+                    _executor.schedule(new TimerTask() {
+
+                        @Override
+                        public void run() {
+                            doTimeoutCheck(inout);
+                        }
+
+                    }, _responseTimeout, TimeUnit.MILLISECONDS);
+                    _ode.getChannel().send(inout);
+                } catch (MessagingException e) {
+                    String errmsg = "Error sending request-only message to JBI for ODE mex
" + odeMex;
+                    __log.error(errmsg, e);
+                }
+            }
+        });
+
+    }
+
+    private void doTimeoutCheck(InOut inout) {
+        final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(inout.getExchangeId());
+
+        if (pmex == null) /* no worries, got a response. */
+            return;
+
+        __log.warn("Timeout on JBI message exchange " + inout.getExchangeId());
+
+        try {
+            _ode._scheduler.execIsolatedTransaction(new Callable<Void>() {
+                public Void call() throws Exception {
+                    pmex.replyWithFailure(FailureType.NO_RESPONSE, "Response not received
after " + _responseTimeout + "ms.", null);
+                    return null;
+                }
+
+            });
+        } catch (Exception ex) {
+            __log.error("Error executing transaction:  ", ex);
+        }
+    }
+}

Propchange: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java?view=auto&rev=549534
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java (added)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java Thu Jun
21 09:33:49 2007
@@ -0,0 +1,61 @@
+package org.apache.ode.jbi;
+
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessagingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+
+/**
+ * Completes {@link OdeConsumer} implementation using the synchronous invocation style (sendSync).
+ * 
+ * @author Maciej Szefler
+ *
+ */
+class OdeConsumerSync extends OdeConsumer {
+    private static final Log __log = LogFactory.getLog(OdeConsumerSync.class);
+    
+
+    OdeConsumerSync(OdeContext ode) {
+        super(ode);
+    }
+
+    
+    protected void doSendTwoWay(final PartnerRoleMessageExchange odeMex, final InOut inout)
{
+        _ode._executorService.submit(new Runnable() {
+            public void run() {
+                try {
+                    _outstandingExchanges.put(inout.getExchangeId(), odeMex);
+                    boolean sendOk = _ode.getChannel().sendSync(inout, _responseTimeout);
+                    if (!sendOk) {
+                        __log.warn("Timeout while sending message for JBI message exchange:
" + inout.getExchangeId());
+                    }
+                    onJbiMessageExchange(inout);
+                } catch (MessagingException e) {
+                    String errmsg = "Error sending request-response message to JBI for ODE
mex " + odeMex;
+                    __log.error(errmsg, e);
+                }
+            }
+        });
+    }
+
+    protected void doSendOneWay(final PartnerRoleMessageExchange odeMex, final InOnly inonly)
{
+        _ode._executorService.submit(new Runnable() {
+            public void run() {
+                try {
+                    boolean sendOk = _ode.getChannel().sendSync(inonly, _responseTimeout);
+                    if (!sendOk) {
+                        __log.warn("Timeout while sending message for JBI message exchange:
" + inonly.getExchangeId());
+                    }
+                    onJbiMessageExchange(inonly);
+                } catch (MessagingException e) {
+                    String errmsg = "Error sending request-only message to JBI for ODE mex
" + odeMex;
+                    __log.error(errmsg, e);
+                }
+            }
+        });
+    }
+
+}

Propchange: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java?view=diff&rev=549534&r1=549533&r2=549534
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java Thu Jun 21 09:33:49
2007
@@ -70,7 +70,7 @@
     /** Mapper by class name. */
     private Map<String, Mapper> _mappersByClassName = new HashMap<String, Mapper>();
 
-    OdeConsumer _consumer = new OdeConsumer(this);
+    OdeConsumer _consumer;
 
     JbiMessageExchangeProcessor _jbiMessageExchangeProcessor = new JbiMessageExchangeEventRouter(this);
 

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java?view=diff&rev=549534&r1=549533&r2=549534
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java Thu Jun 21
09:33:49 2007
@@ -83,7 +83,12 @@
         try {
             _ode = OdeContext.getInstance();
             _ode.setContext(context);
-            _ode._consumer = new OdeConsumer(_ode);
+            
+            // Use system property to determine if DeliveryChannel.sendSync or DeliveryChannel.send
is used.
+            if (Boolean.getBoolean("org.apache.ode.jbi.sendSynch"))
+                _ode._consumer = new OdeConsumerSync(_ode);
+            else 
+                _ode._consumer = new OdeConsumerAsync(_ode);
 
             if (_ode.getContext().getWorkspaceRoot() != null)
                 TempFileManager.setWorkingDirectory(new File(_ode.getContext().getWorkspaceRoot()));



Mime
View raw message