camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: no releaseConnection in http-component?
Date Fri, 16 May 2008 20:59:03 GMT
ok.. I did some deeper analysis of they those
RejectedExecutionException are poping up.  By default the thread(1)
processor will create a thread pool with 1 thread and task queue of
1000.  Obviously your request generator can create requests much
faster than your http server can handle them.  So the request queue
start filling up once it's full, the thread pool starts rejecting
requests.

Thats why you see the error messages.  I think the reason the final
asserts match up correctly is because camel is retrying those failed
requests automatically.

But this got me to thinking that when the task queue fills up might be
a better policy to have the request execute directly (synchronously)
instead of rejecting it outright via the RejectedExecutionException.

I've got a patch that implements this and it avoids the
RejectedExecutionException from happening by default but you can
optionally enable the RejectedExecutionException behaviour.

What do you folks think.. should I commit it?

On Fri, May 16, 2008 at 2:24 PM, Hiram Chirino <hiram@hiramchirino.com> wrote:
> It might be a problem with the thread processor.. going to look into
> it and get back to ya.
>
> On Wed, Apr 2, 2008 at 7:43 PM, Kevin Brown <Kevin.Brown@air2web.com> wrote:
>> Dev,
>>
>> I'm answering my own question since I found the answer in a camel test case:
>>
>> Q: I expected to see a method.releaseConnection called in the http-component. Why
none?
>>
>> A: (1) close explicitly or (2) use conversion to close stream automatically
>>
>> see test case for docs:
>>
>> http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-http/src/test/java/org/apache/camel/component/http/MultiThreadedHttpGetTest.java?view=markup
>>
>>
>> However, I am still seeing many java.util.concurrent.RejectedExecutionExceptions
on a simple test with just one thread.  Any ideas before I hunt through org.apache.camel.component.seda.SedaConsumer?
 YET, MY ASSERTIONS (delivery counts) STILL PASS! odd.
>>
>>
>> grep -B2 -A10 rejected ./rejected.log
>>
>> 2008-04-02 18:02:50,033 [seda:start?size=20000 thread:1] ERROR org.apache.camel.processor.DeadLetterChannel
- Failed delivery for exchangeId: ID-flick/51575-1207177191233/0-39963. On delivery attempt:
0 caught: java.util.concurrent.RejectedExecutionException
>> java.util.concurrent.RejectedExecutionException
>>        at org.apache.camel.processor.ThreadProcessor$1.rejectedExecution(ThreadProcessor.java:91)
>>        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:384)
>>        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:867)
>>        at org.apache.camel.processor.ThreadProcessor.process(ThreadProcessor.java:82)
>>        at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
>>        at org.apache.camel.processor.Pipeline.process(Pipeline.java:85)
>>        at org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:143)
>>        at org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:87)
>>        at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:40)
>>        at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:64)
>>        at java.lang.Thread.run(Thread.java:595)
>>
>>
>>
>>
>>
>> kbrown@flick ~/svn-local/submgr/trunk $ grep rejected ./rejected.log | wc -l
>> 122
>>
>>
>>
>>
>>
>> import static org.junit.Assert.*;
>>
>> import java.io.IOException;
>> import java.util.concurrent.atomic.AtomicInteger;
>>
>> import javax.servlet.ServletException;
>> import javax.servlet.http.HttpServletRequest;
>> import javax.servlet.http.HttpServletResponse;
>>
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.CamelTemplate;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.log4j.Logger;
>> import org.junit.After;
>> import org.junit.Before;
>> import org.junit.Test;
>> import org.mortbay.jetty.Connector;
>> import org.mortbay.jetty.HttpConnection;
>> import org.mortbay.jetty.Request;
>> import org.mortbay.jetty.Server;
>> import org.mortbay.jetty.bio.SocketConnector;
>>
>>
>>
>> public class CamelHttpTest {
>>
>>
>>        final static Logger logger = Logger.getLogger(CamelHttpTest.class);
>>        private CamelContext context;
>>        private CamelTemplate template;
>>        private Server jetty;
>>        final int PORT = 12188;
>>    private HandlerStub handler;
>>
>>        @Before
>>        public void setUpCamel() throws Exception {
>>
>>                context = new DefaultCamelContext();
>>                context.addRoutes(new RouteBuilder() {
>>                        public void configure() {
>>                                from("seda:start?size=20000").thread(1).
>>                                setHeader("Content-Type", constant("text/xml")).
>>                                to("http://localhost:"+PORT+"/").
>>                                convertBodyTo(String.class).
>>                                to("mock:results");
>>                        }
>>                });
>>
>>                template = new CamelTemplate(context);
>>                context.start();
>>        }
>>
>>
>>        @After
>>        public void tearDownCamel() throws Exception {
>>                context.stop();
>>        }
>>
>>
>>    class HandlerStub extends org.mortbay.jetty.handler.AbstractHandler {
>>
>>        AtomicInteger count = new AtomicInteger(0);
>>
>>                public void handle(String target, HttpServletRequest request, HttpServletResponse
response, int dispatch) throws IOException, ServletException {
>>                        logger.debug("target="+target + " request=" +request + " response="+response
+ " dispatch="+dispatch);
>>                        int n = count.incrementAndGet();
>>                        logger.info("count="+n);
>>
>>                        //set handled (weird jetty hander thing, but otherwise we
get a 404 HTTP Error)
>>                        Request base_request = (request instanceof Request) ? (Request)request:HttpConnection.getCurrentConnection().getRequest();
>>                        base_request.setHandled(true);
>>
>>                        response.setContentType("text/html");
>>                        response.setStatus(HttpServletResponse.SC_OK);
>>                        //response.getWriter().println("<h1>Hello OneHandler</h1>");
>>                }
>>    }
>>
>>
>>        @Before
>>        public void setUpJetty() throws Exception {
>>
>>                logger.debug("begin setUpJetty()");
>>
>>
>>                handler = new HandlerStub();
>>
>>        jetty = new Server();
>>        Connector connector=new SocketConnector();
>>        connector.setPort(PORT);
>>        jetty.setConnectors(new Connector[]{connector});
>>        jetty.setHandler(handler);
>>        jetty.start();
>>
>>        logger.debug("exit setUpJetty()");
>>        }
>>
>>        @After
>>        public void tearDownJetty() throws Exception {
>>                jetty.stop();
>>        Thread.sleep(500); //wait for server to shut down
>>        }
>>
>>
>>        @Test
>>        public void testOne() throws InterruptedException {
>>
>>                template.sendBody("seda:start?size=20000", "0");
>>                MockEndpoint resultEndpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
>>                resultEndpoint.expectedMessageCount(1);
>>                Thread.sleep(1000);
>>                resultEndpoint.assertIsSatisfied(); //OK
>>                logger.info("handler.count.get()="+handler.count.get());
>>                assertEquals(1,handler.count.get());
>>        }
>>
>>
>>        @Test
>>        public void test20k() throws InterruptedException {
>>
>>                int REQUESTS=20000;
>>
>>                for (int i=0;i<REQUESTS;i++) {
>>
>>                        String s= String.valueOf(i);
>>                        logger.info("sending body i="+i);
>>                        template.sendBody("seda:start?size=20000", s);
>>
>>                }
>>
>>                int i=0;
>>                while (handler.count.get() != REQUESTS && i < REQUESTS)
{
>>                        i++;
>>                        Thread.sleep(500);
>>                        if (REQUESTS % 10 == 0) {
>>                                logger.info("sleeping... i="+i+" handler.count.get()="+handler.count.get());
>>                        }
>>                }
>>                logger.info("handler.count.get()="+handler.count.get());
>>
>>                assertEquals(REQUESTS,handler.count.get());
>>                MockEndpoint resultEndpoint = context.getEndpoint("mock:results",
MockEndpoint.class);
>>                resultEndpoint.expectedMessageCount(REQUESTS);
>>                resultEndpoint.assertIsSatisfied(); //OK
>>
>>        }
>>
>> }
>>
>>
>> // - kevin
>>
>> -----Original Message-----
>> From: Kevin Brown
>> Sent: Fri 3/28/2008 7:10 PM
>> To: Kevin Brown; camel-dev@activemq.apache.org
>> Subject: no releaseConnection  in http-component?
>>
>> Camel-dev,
>>
>> I expected to see a method.releaseConnection called in the http-component.
>>
>> kbrown@flick ~/projects/camel/components/camel-http/src/main/java/org/apache/camel/component/http
$ ls
>> CamelServlet.java  HttpClientConfigurer.java  HttpConsumer.java  HttpExchange.java
 HttpPollingConsumer.java  package.html
>> HttpBinding.java   HttpComponent.java         HttpEndpoint.java  HttpMessage.java
  HttpProducer.java
>> kbrown@flick ~/projects/camel/components/camel-http/src/main/java/org/apache/camel/component/http
$ grep releaseConnection *.java
>>
>> I wouldn't have gone looking for this, except that I'm running into this error when
trying to do any non-trivial http traffic:
>> MultiThreadedHttpConnectionManager - Unable to get a connection, waiting...
>>
>> Anyone else having trouble with http-component? Suggestions?  The output might have
to be buffered if releaeConnection were called and the outputStream
>> were passed to the exchange, I suppose.
>>
>> - Kevin
>>
>>
>>
>>
>>
>
>
>
> --
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com
>
> Open Source SOA
> http://open.iona.com
>



-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Open Source SOA
http://open.iona.com

Mime
View raw message