camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Claus Ibsen (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
Date Fri, 20 Jan 2012 12:26:39 GMT

    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13189758#comment-13189758
] 

Claus Ibsen commented on CAMEL-4925:
------------------------------------

Yeah we would need to check in ThreadsDefinition if you have configured Discard and DiscardOldest.
And then wrap those policies with a custom policy, so we get the callback from the JDK when
the task is rejected.

The discard, would possible not be needed, as I would assume the thread pool will reject it
asap, when you try to submit it. But the discard oldest, is an existing task from the task
queue, so that is a different story.

Then we need to provide this as a callback to the ThreadsProcessor, so it can do custom logic
when the RejectedExecutionHandler#rejectedExecution is invoked. We can then from the Runnable
parameter cast that to ProcessCall, and then get access to the Exchange. Then we can set on
the Exchange a RejectedExecutionException as exception, and invoke its callback. Then Camel
will take it from there to remove the exchange from inflight registry and whatnot.

Something along the lines of that. Its a shame the API of the ExecutorService do not have
a runnable for rejected execution. Then it would have been easier.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy
leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy
leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy
executorService will no throw RejectedExecutionException, so exchange remains unprocessed
and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message