camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zdeněk Obst (JIRA) <j...@apache.org>
Subject [jira] [Updated] (CAMEL-9606) SJMS Consumer-Producer in transaciton
Date Wed, 23 Mar 2016 10:48:25 GMT

     [ https://issues.apache.org/jira/browse/CAMEL-9606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Zdeněk Obst updated CAMEL-9606:
-------------------------------
    Description: 
I'm not 100% sure this is a bug but it feels that way from conversation I had via mailing
lists.

I'm trying to ensure transactional processing between SJMS consumer and producer (e.g. using
same JMS session). 

In other words this simple case:
1. prepare higher amount of JMS messages in broker (e.g. ActiveMQ with 1000 messages) 
2. use Camel route from input queue to output queue using trasacted=true 
3. start context (starts consuming messages) and in any time kill java process 

When I kill process, I would expect that sum of messages in input and output queue will be
1000 - so the transaction works. But what happens is that I always end up with 1001+ messages.
Maybe it is misconfiguration of routes or misunderstanding how SJMS can work.

I feel this is critical because JMS is generally used because it its transactional capabilities.

Here is the sample code I used for reproduction (using ActiveMQ):
{code:java}
public class SjmsTransaction {

    public static void main(String[] args) throws Exception {
        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                onException(Exception.class)
                        .process(systemOut("Exception!!"));

                from("sjms:queue:test-in?transacted=true&consumerCount=5")
                        .process(systemOut("Processing"))
                        .to("sjms:queue:test-out?transacted=true")
                        .process(systemOut("Processed"));
            }
        };

        CamelContext context = new DefaultCamelContext();
        addJmsComponent(context);
        context.addRoutes(rb);

        System.out.println("=====> Starting context");
        context.start();
        // Now the context will run and consume messages, when I kill application by force
in any time
        // I expect this to be true: <#messagesInInputAtBeginning> == <#messagesInInputNow>
+ <#messagesInOutputNow>
        // What happens is that there is always < (e.g. I submitted 1000 messages, out
has 500, in has 501)
    }

    private static void addJmsComponent(CamelContext context) {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactoryResource connResource = new ConnectionFactoryResource(5, factory);
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionResource(connResource);
        context.addComponent("sjms", comp);
    }

    private static Processor systemOut(final String message) {
        return new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange.getExchangeId() + ": " + message);
            }
        };
    }
}
{code}

Note that I tried to use it with various combinations of acknowledgeMode and In/InOut exchange
pattern - but without luck.
I'm not that much oriented in Camel source code but I found that JMS session is held within
the exchange so probably when producer finds in an exchange existing JMS session and is configured
to be transacted, then maybe it can participate this session? Or maybe there are other hooks
(like Synchronization objects) in some registry that take care of this issue?

Here is the link to the previous mailing list conversation: http://camel.465427.n5.nabble.com/SJMS-transaction-td5777522.html

  was:
I'm not 100% sure this is a bug but it feels that way from conversation I had via mailing
lists.

I'm trying to ensure transactional processing between SJMS consumer and producer (e.g. using
same JMS session). 

In other words this simple case:
1. prepare higher amount of JMS messages in broker (e.g. ActiveMQ with 1000 messages) 
2. use Camel route from input queue to output queue using trasacted=true 
3. start context (starts consuming messages) and in any time kill java process 

When I kill process, I would expect that sum of messages in input and output queue will be
1000 - so the transaction works. But what happens is that I always end up with 1001+ messages.
Maybe it is misconfiguration of routes or misunderstanding how SJMS can work.

Here is the sample code I used for reproduction (using ActiveMQ):
{code:java}
public class SjmsTransaction {

    public static void main(String[] args) throws Exception {
        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                onException(Exception.class)
                        .process(systemOut("Exception!!"));

                from("sjms:queue:test-in?transacted=true&consumerCount=5")
                        .process(systemOut("Processing"))
                        .to("sjms:queue:test-out?transacted=true")
                        .process(systemOut("Processed"));
            }
        };

        CamelContext context = new DefaultCamelContext();
        addJmsComponent(context);
        context.addRoutes(rb);

        System.out.println("=====> Starting context");
        context.start();
        // Now the context will run and consume messages, when I kill application by force
in any time
        // I expect this to be true: <#messagesInInputAtBeginning> == <#messagesInInputNow>
+ <#messagesInOutputNow>
        // What happens is that there is always < (e.g. I submitted 1000 messages, out
has 500, in has 501)
    }

    private static void addJmsComponent(CamelContext context) {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactoryResource connResource = new ConnectionFactoryResource(5, factory);
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionResource(connResource);
        context.addComponent("sjms", comp);
    }

    private static Processor systemOut(final String message) {
        return new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange.getExchangeId() + ": " + message);
            }
        };
    }
}
{code}

Note that I tried to use it with various combinations of acknowledgeMode and In/InOut exchange
pattern - but without luck.
I'm not that much oriented in Camel source code but I found that JMS session is held within
the exchange so probably when producer finds in an exchange existing JMS session and is configured
to be transacted, then maybe it can participate this session? Or maybe there are other hooks
(like Synchronization objects) in some registry that take care of this issue?

Here is the link to the previous mailing list conversation: http://camel.465427.n5.nabble.com/SJMS-transaction-td5777522.html


> SJMS Consumer-Producer in transaciton
> -------------------------------------
>
>                 Key: CAMEL-9606
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9606
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 2.15.4, 2.16.2
>            Reporter: Zdeněk Obst
>            Priority: Critical
>
> I'm not 100% sure this is a bug but it feels that way from conversation I had via mailing
lists.
> I'm trying to ensure transactional processing between SJMS consumer and producer (e.g.
using same JMS session). 
> In other words this simple case:
> 1. prepare higher amount of JMS messages in broker (e.g. ActiveMQ with 1000 messages)

> 2. use Camel route from input queue to output queue using trasacted=true 
> 3. start context (starts consuming messages) and in any time kill java process 
> When I kill process, I would expect that sum of messages in input and output queue will
be 1000 - so the transaction works. But what happens is that I always end up with 1001+ messages.
Maybe it is misconfiguration of routes or misunderstanding how SJMS can work.
> I feel this is critical because JMS is generally used because it its transactional capabilities.
> Here is the sample code I used for reproduction (using ActiveMQ):
> {code:java}
> public class SjmsTransaction {
>     public static void main(String[] args) throws Exception {
>         RouteBuilder rb = new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 onException(Exception.class)
>                         .process(systemOut("Exception!!"));
>                 from("sjms:queue:test-in?transacted=true&consumerCount=5")
>                         .process(systemOut("Processing"))
>                         .to("sjms:queue:test-out?transacted=true")
>                         .process(systemOut("Processed"));
>             }
>         };
>         CamelContext context = new DefaultCamelContext();
>         addJmsComponent(context);
>         context.addRoutes(rb);
>         System.out.println("=====> Starting context");
>         context.start();
>         // Now the context will run and consume messages, when I kill application by
force in any time
>         // I expect this to be true: <#messagesInInputAtBeginning> == <#messagesInInputNow>
+ <#messagesInOutputNow>
>         // What happens is that there is always < (e.g. I submitted 1000 messages,
out has 500, in has 501)
>     }
>     private static void addJmsComponent(CamelContext context) {
>         ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
>         ConnectionFactoryResource connResource = new ConnectionFactoryResource(5, factory);
>         SjmsComponent comp = new SjmsComponent();
>         comp.setConnectionResource(connResource);
>         context.addComponent("sjms", comp);
>     }
>     private static Processor systemOut(final String message) {
>         return new Processor() {
>             @Override
>             public void process(Exchange exchange) throws Exception {
>                 System.out.println(exchange.getExchangeId() + ": " + message);
>             }
>         };
>     }
> }
> {code}
> Note that I tried to use it with various combinations of acknowledgeMode and In/InOut
exchange pattern - but without luck.
> I'm not that much oriented in Camel source code but I found that JMS session is held
within the exchange so probably when producer finds in an exchange existing JMS session and
is configured to be transacted, then maybe it can participate this session? Or maybe there
are other hooks (like Synchronization objects) in some registry that take care of this issue?
> Here is the link to the previous mailing list conversation: http://camel.465427.n5.nabble.com/SJMS-transaction-td5777522.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message