cxf-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sergey Beryozkin (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (CXF-2589) Allow CXF Bean endpoint to work in asynchronous mode
Date Tue, 29 Dec 2009 13:00:29 GMT

    [ https://issues.apache.org/jira/browse/CXF-2589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12795045#action_12795045
] 

Sergey Beryozkin edited comment on CXF-2589 at 12/29/09 12:59 PM:
------------------------------------------------------------------

Hi Charles,

please see 

http://svn.apache.org/repos/asf/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java

I'm wondering, can you try to use Continuations explicitly given that getIncidentsAsync()
is quite explicit already in the way it uses Futures/etc ?

So you might have a code like this :

private ConcurrentHashMap<Continuation, List<Incident>> incidents = ...

@GET
@Path("/incidents2/")
public Incidents getIncidentsAsync() {

 Continuation c = getContinuation(); 
 synchronized (c) {
    if (c.isNew()) {

        // Call the distant service to get result (ASYNCHR WAY )

        Future future = producer.asyncRequestBody("jms:queue:inout", "GET");

        // this will throw a RuntimeException (SuspendedInvocationException)

        extractFutureBody(c, future);

    }  else if (incidents.containsKey(c))

        // here we also have a choice if suspending the invocation again if some other related
information is not ready yet

       // or just do return c.getUserObject if no map were used

        return incidents.remove(c);

    } else {

         // here we also have a choice if suspending the invocation again if some other related
information is not ready yet, for example, extractFutureBody suspends the invocation for 10001
so in theory the invocation might've come back just a fraction of millisec before the producer
has finished extracting the body, so here we might give it another try and suspend it again
for say another few secs

         return null;
    }
    
 }
 
}

private void doAsyncCall(final Continuation c, final Future f) {

// you might want to have a private executor instead

new Thread(new Runnable() {
 public void run() {
  List<ReportIncident> incidentsList = null;
  try { 
     incidentsList = producer.extractFutureBody(future, 10000,TimeUnit.MILLISECONDS, List.class);

  } catch (TimeoutException e) { 
     // null
  }

  // alternatively, instead of keeping an incidents map, just save the result on the continuation
itself and then resume (c.setUserObject(incidentsList ))

  incidents.put(c, incidentsList);

  // this one will ensure an original suspended invocation will return into getIncidents2()

  c.resume();
 }
}).start();

// this one will throw  a SuspendedInvocationException and ensure it will get back in 10 secs
+ 1 milli sec
c.suspend(10001);

}



If no cxfcamel (which intercepts a getIncidendtsAsync) were involved then this code would
most likely work really well. But cxfcamel might need to be updated a bit to deal with SuspendedInvocationExceptions,
probably very similarly to the way ServiceMix CXF BC component has been updated (would be
a copy and paste probably), and it would need to be done even when we introduce later on @SuspendedInvocation
later on...

Give it a try please and let me know how it goes on - I'm quite sure you will end up opening
a Camel CXF JIRA :-) but I'll be excited if it just will work as is in Camel too :-)

cheers, Sergey

      was (Author: sergey_beryozkin):
    Hi Charles,

please see 

http://svn.apache.org/repos/asf/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java

I'm wondering, can you try to use Continuations explicitly given that getIncidentsAsync()
is quite explicit already in the way it uses Futures/etc ?

So you might have a code like this :

private ConcurrentHashMap<Continuation, List<Incident>> incidents = ...

@GET
@Path("/incidents2/")
public Incidents getIncidentsAsync() {

 Continuation c = getContinuation(); 
 synchronized (c) {
    if (c.isNew()) {
        // Call the distant service to get result (ASYNCHR WAY )
        Future future = producer.asyncRequestBody("jms:queue:inout", "GET");
        // this will throw a RuntimeException (SuspendedInvocationException)
        extractFutureBody(c, future);
    }  else if (incidents.containsKey(c))
        // here we also have a choice if suspending the invocation again if some other related
information is not ready yet
        return incidents.remove(c);
    } else {
         // here we also have a choice if suspending the invocation again if some other related
information is not ready yet, for example, extractFutureBody suspends the invocation for 10001
so in theory the invocation might've come back just a fraction of millisec before the producer
has finisehd extracting the body, so here we might give it another try and suspend it again
for say another few secs
         return null;
    }
    
 }
 
}

private void doAsyncCall(final Continuation c, final Future f) {

// you might want to have a private executor instead

new Thread(new Runnable() {
 public void run() {
  List<ReportIncident> incidentsList = null;
  try { 
     incidentsList = producer.extractFutureBody(future, 10000,TimeUnit.MILLISECONDS, List.class);

  } catch (TimeoutException e) { 
     // null
  }
  incidents.put(c, incidentsList);
  // this one will ensure an original suspended invocation will return into getIncidents2()
  c.resume();
 }
}).start();

// this one will throw  a SuspendedInvocationException and ensure it will get back in 10 secs
+ 1 milli sec
c.suspend(10001);

}

If no cxfcamel (which intercepts a getIncidendtsAsync) were involved then this code would
most likely work really well. But cxfcamel might need to be updated a bit to deal with SuspendedInvocationExceptions,
probably very similarly to the way ServiceMix CXF BC component has been updated (would be
a copy and paste probably), and it would need to be done even when we introduce later on @SuspendedInvocation
later on...

Give it a try please and let me know how it goes on - I'm quite sure you will end up opening
a Camel CXF JIRA :-) but I'll be excited if it just will work as is in Camel too :-)

cheers, Sergey
  
> Allow CXF Bean endpoint to work in asynchronous mode
> ----------------------------------------------------
>
>                 Key: CXF-2589
>                 URL: https://issues.apache.org/jira/browse/CXF-2589
>             Project: CXF
>          Issue Type: New Feature
>    Affects Versions: 2.2.5
>            Reporter: Charles Moulliard
>
> The following code does not work :
> 	@GET
> 	@Path("/incidents2/")
> 	public Incidents getIncidentsAsync() {
> 		Incidents l = new Incidents();
> 		// Call the distant service to get result (ASYNCHR WAY )
> 		Future future = producer.asyncRequestBody("jms:queue:inout", "GET");
> 		// Check if we receive a feedback within 10 seconds
> 		List<ReportIncident> i;
> 		try {
> 			i = producer.extractFutureBody(future, 10000,TimeUnit.MILLISECONDS, List.class);
> 		} catch (TimeoutException e) {
> 			return null;
> 		}
> 		l.setIncidents(i);
> 		return l;
> 	}
> because CXF bean endpoint does not allow to work with async process

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message