camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r1036343 - /camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
Date Thu, 18 Nov 2010 08:30:42 GMT
Author: ningjiang
Date: Thu Nov 18 08:30:41 2010
New Revision: 1036343

URL: http://svn.apache.org/viewvc?rev=1036343&view=rev
Log:
CAMEL-3342, CAMEL-3343 Clean up camel-cxfrs code

Modified:
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java?rev=1036343&r1=1036342&r2=1036343&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
(original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
Thu Nov 18 08:30:41 2010
@@ -19,6 +19,7 @@ package org.apache.camel.component.cxf.j
 import java.lang.reflect.Method;
 
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
@@ -28,11 +29,21 @@ import org.apache.commons.logging.LogFac
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.jaxrs.JAXRSInvoker;
+import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
+import org.apache.cxf.jaxrs.model.ClassResourceInfo;
 import org.apache.cxf.jaxrs.model.OperationResourceInfo;
+import org.apache.cxf.jaxrs.provider.ProviderFactory;
+import org.apache.cxf.jaxrs.utils.JAXRSUtils;
 import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageContentsList;
+import org.apache.cxf.version.Version;
 
 public class CxfRsInvoker extends JAXRSInvoker {
     private static final Log LOG = LogFactory.getLog(CxfRsInvoker.class);
+    private static final String SUSPENED = "org.apache.camel.component.cxf.jaxrs.suspend";
+    private static final String SERVICE_OBJECT_SCOPE = "org.apache.cxf.service.scope";
+    private static final String REQUEST_SCOPE = "request";
     private CxfRsConsumer cxfRsConsumer;
     private CxfRsEndpoint endpoint;
     
@@ -41,6 +52,61 @@ public class CxfRsInvoker extends JAXRSI
         this.cxfRsConsumer = consumer;
     }
     
+    // To work around the invoke reenter issue
+    public Object invoke(Exchange exchange, Object request) {
+        // These code 
+        Response response = exchange.get(Response.class);
+        if (response != null) {
+            return new MessageContentsList(response);
+        }
+        ResourceProvider provider = getResourceProvider(exchange);
+        Object serviceObject = getServiceObject(exchange);
+        try {
+            return invoke(exchange, request, serviceObject);
+        } finally {
+            if (Boolean.TRUE.equals(exchange.get(SUSPENED))) {
+                // should check the 
+                exchange.put(JAXRSUtils.ROOT_INSTANCE, serviceObject);
+                exchange.put(JAXRSUtils.ROOT_PROVIDER, provider);
+            } else {
+                // clean up the resource
+                if (exchange.isOneWay()) {
+                    ProviderFactory.getInstance(exchange.getInMessage()).clearThreadLocalProxies();
+                }
+                if (!isServiceObjectRequestScope(exchange.getInMessage())) {
+                    provider.releaseInstance(exchange.getInMessage(), serviceObject);
+                }
+                exchange.remove(JAXRSUtils.ROOT_INSTANCE);
+                exchange.remove(JAXRSUtils.ROOT_PROVIDER);
+            }
+        }
+    }
+    
+    public Object getServiceObject(Exchange exchange) {
+        if (exchange.get(JAXRSUtils.ROOT_INSTANCE) != null) {
+            return exchange.get(JAXRSUtils.ROOT_INSTANCE);
+        } else {
+            OperationResourceInfo ori = exchange.get(OperationResourceInfo.class);
+            ClassResourceInfo cri = ori.getClassResourceInfo();
+            return cri.getResourceProvider().getInstance(exchange.getInMessage());
+        }
+    }
+        
+    private ResourceProvider getResourceProvider(Exchange exchange) {
+        if (exchange.get(JAXRSUtils.ROOT_PROVIDER) != null) {
+            return (ResourceProvider)exchange.get(JAXRSUtils.ROOT_PROVIDER);
+        } else {
+            OperationResourceInfo ori = exchange.get(OperationResourceInfo.class);
+            ClassResourceInfo cri = ori.getClassResourceInfo();
+            return cri.getResourceProvider();
+        }
+    }
+    
+    private boolean isServiceObjectRequestScope(Message inMessage) {
+        Object scope = inMessage.getContextualProperty(SERVICE_OBJECT_SCOPE);
+        return REQUEST_SCOPE.equals(scope);
+    }
+    
     protected Object performInvocation(Exchange cxfExchange, final Object serviceObject,
Method method,
                                        Object[] paramArray) throws Exception {
         paramArray = insertExchange(method, paramArray, cxfExchange);
@@ -50,7 +116,8 @@ public class CxfRsInvoker extends JAXRSI
             return method.invoke(serviceObject, paramArray);
         }
         Continuation continuation = getContinuation(cxfExchange);
-        if (continuation != null && !endpoint.isSynchronous()) {
+        // Only calling the continuation API for CXF 2.3.x 
+        if (continuation != null && !endpoint.isSynchronous() && Version.getCurrentVersion().startsWith("2.3"))
{
             return asyncInvoke(cxfExchange, serviceObject, method, paramArray, continuation);
         } else {
             return syncInvoke(cxfExchange, serviceObject, method, paramArray);
@@ -60,57 +127,50 @@ public class CxfRsInvoker extends JAXRSI
     private Continuation getContinuation(Exchange cxfExchange) {
         ContinuationProvider provider = 
             (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
-        return provider.getContinuation();
+        return provider == null ? null : provider.getContinuation();
     }
     
     private Object asyncInvoke(Exchange cxfExchange, final Object serviceObject, Method method,
                               Object[] paramArray, final Continuation continuation) throws
Exception {
-        if (continuation.isNew()) {
-            ExchangePattern ep = ExchangePattern.InOut;
-            if (method.getReturnType() == Void.class) {
-                ep = ExchangePattern.InOnly;
-            } 
-            final org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
-            CxfRsBinding binding = endpoint.getBinding();
-            binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method,
paramArray);
-            boolean sync = cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback()
{
-                public void done(boolean doneSync) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Resuming continuation of exchangeId: " + camelExchange.getExchangeId());
-                    }
-                    // resume processing after both, sync and async callbacks
-                    continuation.setObject(camelExchange);
-                    continuation.resume();
-                }
-            });
-            // just need to avoid the continuation.resume is called
-            // before the continuation.suspend is called
-            if (continuation.getObject() != camelExchange && !sync) {
+        synchronized (continuation) {
+            if (continuation.isNew()) {
+                ExchangePattern ep = ExchangePattern.InOut;
+                if (method.getReturnType() == Void.class) {
+                    ep = ExchangePattern.InOnly;
+                } 
+                final org.apache.camel.Exchange camelExchange = endpoint.createExchange(ep);
+                CxfRsBinding binding = endpoint.getBinding();
+                binding.populateExchangeFromCxfRsRequest(cxfExchange, camelExchange, method,
paramArray);
                 // Now we don't set up the timeout value
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
                 }
-                // The continuation could be called before the suspend
-                // is called
+                // TODO Support to set the timeout in case the Camel can't send the response
back on time.
+                // The continuation could be called before the suspend is called
                 continuation.suspend(0);
-                
-            } else {
-                // just set the response back, as the invoking thread is
-                // not changed
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
-                }
+                cxfExchange.put(SUSPENED, Boolean.TRUE);
+                cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback()
{
+                    public void done(boolean doneSync) {
+                        // make sure the continuation resume will not be called before the
suspend method in other thread
+                        synchronized (continuation) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Resuming continuation of exchangeId: " + camelExchange.getExchangeId());
+                            }
+                            // resume processing after both, sync and async callbacks
+                            continuation.setObject(camelExchange);
+                            continuation.resume();
+                        }
+                    }
+                });
+                return null;
+            }
+            if (continuation.isResumed()) {
+                cxfExchange.put(SUSPENED, Boolean.FALSE);
+                org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
+                    .getObject();
                 return returnResponse(cxfExchange, camelExchange);
             }
-
         }
-        if (continuation.isResumed()) {
-            org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
-                .getObject();
-            return returnResponse(cxfExchange, camelExchange);
-
-        }
-        
         return null;
     }
     



Mime
View raw message