camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dmytro Puzhay (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CAMEL-6784) PollingConsumer.receiveNoWait() never returns null consuming from AMQPComponent endpoint in browsing mode
Date Wed, 09 Oct 2013 11:18:42 GMT

    [ https://issues.apache.org/jira/browse/CAMEL-6784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13790256#comment-13790256
] 

Dmytro Puzhay commented on CAMEL-6784:
--------------------------------------

Hi Christian,

thanks for answer, please see below how my code looks like.

In my RouteBuilder constructor:

this.timer = new TimerComponent();
getContext().addComponent("timer", this.timer);

this.amqpComponent = (AMQPComponent) AMQPComponent.amqpComponent("amqp://admin:admin@localhost/test?brokerlist='tcp://localhost:10315'");
getContext().addComponent("amqp", this.amqpComponent);

this.directComponent = new DirectComponent();
getContext().addComponent("direct", this.directComponent);

In my RouteBuilder configure:

Endpoint timerEndpoint = this.timer.createEndpoint("timer:MyTimer?delay=0&repeatCount=1");
Endpoint amqpEndpoint = this.amqpComponent.createEndpoint("amqp:queue:broadcast.MyTopic.MyQueue");
Endpoint directEndpoint = this.directComponent.createEndpoint("direct:MyDirect");

MyPollingConsumer myConsumer = new MyPollingConsumer(amqpEndpoint, directEndpoint);

from(timerEndpoint).bean(myConsumer);
from(directEndpoint) ...

MyPollingConsumer:

public class LoaderPollingConsumer
{
    private PollingConsumer pollingConsumer = null;
    private ProducerTemplate producerTemplate = null;

    public LoaderPollingConsumer(Endpoint amqpEndpoint, Endpoint directEndpoint) throws Exception
    {
        this.pollingConsumer = amqpEndpoint.createPollingConsumer();

        ProducerTemplate directEndpointProducerTemplate = directEndpoint.getCamelContext().createProducerTemplate();
        directEndpointProducerTemplate.setDefaultEndpoint(directEndpoint);
        this.producerTemplate = directEndpointProducerTemplate;
    }

    @Handler
    public void onEvent() throws Exception
    {
        this.producerTemplate.start();

        while (true)
        {
            Exchange exchange = this.pollingConsumer.receiveNoWait();

           if (exchange != null)
           {
                this.producerTemplate.send(exchange);
            }
           else
           {
                this.producerTemplate.sendBody(null);
                break;
            }
        }

        this.producerTemplate.stop();
    }
}

"AMQP component uses the JMS component under the covers (which uses Spring DMLC)... I don't
think you can force the DMLC to do a browse as it's event driven and a browse is a static
snapshot"

I thinks one can read in browsing mode with event-driven route, but can't with polling consumer,
because polling consumer technically makes new connection with every poll, which simply forces
amqp to start from first message.

"Or try using the BrowsableEndpoint like this"

I will try (probably this week) and write back about results.

> PollingConsumer.receiveNoWait() never returns null consuming from AMQPComponent endpoint
in browsing mode
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6784
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6784
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-amqp
>    Affects Versions: 2.10.2
>            Reporter: Dmytro Puzhay
>
> I create PollingConsumer for my AMQP queue and consume with receiveNoWait() until null
is received. It worked nicely in default mode, but in browsing mode (queue name suffixed with
"{mode: browse}", see https://qpid.apache.org/books/0.12/Programming-In-Apache-Qpid/html/ch02s04.html
"2.4.3.3. browse" for details) my PollingConsumer keeps running infinitely.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Mime
View raw message