camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d1x <zdenek.o...@gmail.com>
Subject SJMS transaction
Date Wed, 10 Feb 2016 10:18:15 GMT
Hello,

is there a way how to ensure transactions between consumer-producer of two
JMS queues using SJMS component? 

What I'm testing is this simple case:
1. prepare higher amount of JMS messages in broker (e.g. 1000)
2. have Camel route from input queue to output queue
3. start context (starts consuming messages) and in any time kill java
process

What I would expect is that sum of messages in input queue and output queue
will be 1000. But what happens is that the sum is higher (something between
1000 and 1005 as I use 5 parallelConsumers). It is because when I kill
process in the time when JMS message was already submitted to JMS output
queue - it will remain there even if the transaction (acknowledge) of
message from input queue did not succeed (because of kill). 

I tried hard to find out how to configure Apache Camel properly. From what I
understood from documentation is that I cannot use Camel Transaction
Processor. But still there are options like acknowledgementMode (but their
options seem not documented at all, not even in source code).

So is there a way how to configure SJMS component to behave fully
transactionally between consumer-producer? Very likely they will use the
same JMS session or use other way of synchronization?

Here is the simple code I use for testing

public class SjmsTransaction {

    public static void main(String[] args) throws Exception {
        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
               
from("sjms:queue:test-in?transacted=true&acknowledgementMode=DUPS_OK_ACKNOWLEDGE&consumerCount=5")
//                        .setExchangePattern(ExchangePattern.InOut)
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws
Exception {
                                System.out.println("Processed message " +
exchange.getExchangeId());
                            }
                        })
                        .to("sjms:queue:test-out?transacted=true");
            }
        };

        CamelContext context = new DefaultCamelContext();
        addJmsComponent(context);
        context.addRoutes(rb);

        System.out.println("=====> Starting context");
        context.start();
        // Now the context will run and consume messages, when I stop
application by force in any time
        // I expect this to be true: <#submittedMessages> ==
<#messagesInInput> + <#messagesInOutput>
        // What happens is that there more messages that was not
acknowledged (from input) but are already in output
    }

    private static void addJmsComponent(CamelContext context) {
        ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactoryResource connResource = new
ConnectionFactoryResource(5, factory);
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionResource(connResource);
        context.addComponent("sjms", comp);
    }
}

Thank you very much for any answer



--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message