activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jason Shepherd (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AMQ-1126) The Resource Adapter ignores the JMSXGroupID when dispatching to MDBs
Date Wed, 27 Mar 2013 06:41:18 GMT

    [ https://issues.apache.org/jira/browse/AMQ-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13614947#comment-13614947
] 

Jason Shepherd edited comment on AMQ-1126 at 3/27/13 6:41 AM:
--------------------------------------------------------------

We found that message sequencing is not working correctly when using the above approach. Therefore
we suggest that there is a bug still in the Resource Adapter.

We tested using:
JBoss EAP 6.0.1
ActiveMQ RAR 5.8.0

I have attached a test case (amq_reproducer.zip) which demonstrates the issue in that environment.
Here is how it works:


    amq_reproducer.zip:
        test_amq.jar
        test_amq_web.war
    activemq-rar-5.8.0.rar 1
    Changes required in standalone-full.xml for registering activemq resource adapter

{code:xml}

 <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0">
            <resource-adapters>
                <resource-adapter>
                    <archive>
                        activemq-rar-5.8.0.rar
                    </archive>
                    <transaction-support>XATransaction</transaction-support>
                    <config-property name="UseInboundSession">
                        false
                    </config-property>
                    <config-property name="Password">
                        defaultPassword
                    </config-property>
                    <config-property name="UserName">
                        defaultUser
                    </config-property>
                    <config-property name="ServerUrl">
                        vm://localhost
                    </config-property>
                    <connection-definitions>
                        <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory"

jndi-name="java:jboss/ConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                            <xa-pool>
                                <min-pool-size>1</min-pool-size>
                                <max-pool-size>20</max-pool-size>
                            </xa-pool>
                        </connection-definition>
                    </connection-definitions>
                    <admin-objects>
                        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue"

jndi-name="java:jboss/queue/MyActiveMQQueue" use-java-context="true" pool-name="MyActiveMQQueue">
                            <config-property name="PhysicalName">
                                QueuePhysicalName
                            </config-property>
                        </admin-object>
                    </admin-objects>
                </resource-adapter>
            </resource-adapters>
        </subsystem>
{code}
test_amq.jar
This jar contains two group of MDBs. Three MDBs (AMQTestMDB*) are consumer for AMQ queue QueuePhysicalName
(defined in standalone-ha.xml). The other three MDBs (HornetQTestMDB*) are consumer for HornetQ
(default implementation) queue queue/HornetQMDBQueue (defined in war's hornetq-jms.xml).

The onMessage method call following simple logic
{code:borderStyle=solid}

public void handleMessage(String id, TextMessage tm) {
        try {
            String logMsg = id + " Testing MDB : " + tm.getText()
                    + ", " + this.toString();
            System.out.println(logMsg);
            if (tm.getText().startsWith("Hello")) {
                // simulate heavy processing of first message
                long sec = 5;
                System.out.println(logMsg + " - Heavy processing for " + sec
                        + " seconds. " + tm.getText() + ", " + this.toString());
                Random r = new Random();
                for (long stop = System.nanoTime()
                        + TimeUnit.SECONDS.toNanos(5); stop > System.nanoTime();) {
                    r.nextDouble();
                }
                System.out.println(logMsg + " Done!");
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
{code}
test_amq_web.war
This war contains clients for the test queues. /test tests AMQ. /test2 tests HornetQ. Following
is the logic used:
{code:borderStyle=solid}
    for (int i = 0; i < 5; i++) {
        msg.setText("Hello " + i + "[" + groupName + "]");
        msg2.setText("World " + i + "[" + groupName + "]");

        if (useGroup) {
            msg.setStringProperty("JMSXGroupID", groupName);
            msg2.setStringProperty("JMSXGroupID", groupName);
        }

        qsender.send(msg);
        qsender.send(msg2);
    }
{code}
Each servlet creates 5 threads. Each thread creates two text messages and sends it out 5 times.

Executing test cases
- ActiveMQ test: http://localhost:8080/test_amq_web/test
- ActiveMQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test?group=false
- HornetQ test: http://localhost:8080/test_amq_web/test2
- HornetQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test2?group=false

Results:

When invoking the test_amq_web/test we can see that messages using the same message group
are not being processed sequentially. In the logs we get messages in this order:
{code}
15:00:47,031 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730
15:00:47,032 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730 - Heavy processing for 5 seconds. Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730
...
15:00:47,066 INFO  [stdout] (default-threads - 12) Consume-AMQ3 Testing MDB : World 0[Group-1],
com.redhat.gss.test.HandleMessage@5418f143
...
15:00:52,035 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730 Done!
{code}
However when invoking test_amq_web/test2 (which uses HornetQ) we can see that the message
are being processed sequentially:
{code}
15:08:35,740 INFO  [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1
Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
15:08:35,744 INFO  [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1
Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 - Heavy processing
for 5 seconds. Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
...
15:08:40,784 INFO  [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3
Testing MDB : Hello 0[Group-3], com.redhat.gss.test.HandleMessage@bce1791 Done!
15:08:40,788 INFO  [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3
Testing MDB : World 0[Group-3], com.redhat.gss.test.HandleMessage@5e8ace0e
{code}
                
      was (Author: jshepher):
    We found that message sequencing is not working correctly when using the above approach.
Therefore we suggest that there is a bug still in the Resource Adapter.

We tested using:
JBoss EAP 6.0.1
ActiveMQ RAR 5.8.0

I have attached a test case (amq_reproducer.zip) which demonstrates the issue in that environment.
Here is how it works:


    amq_reproducer.zip:
        test_amq.jar
        test_amq_web.war
    activemq-rar-5.8.0.rar 1
    Changes required in standalone-full.xml for registering activemq resource adapter

{code:xml}

 <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0">
            <resource-adapters>
                <resource-adapter>
                    <archive>
                        activemq-rar-5.8.0.rar
                    </archive>
                    <transaction-support>XATransaction</transaction-support>
                    <config-property name="UseInboundSession">
                        false
                    </config-property>
                    <config-property name="Password">
                        defaultPassword
                    </config-property>
                    <config-property name="UserName">
                        defaultUser
                    </config-property>
                    <config-property name="ServerUrl">
                        vm://localhost
                    </config-property>
                    <connection-definitions>
                        <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory"

jndi-name="java:jboss/ConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                            <xa-pool>
                                <min-pool-size>1</min-pool-size>
                                <max-pool-size>20</max-pool-size>
                            </xa-pool>
                        </connection-definition>
                    </connection-definitions>
                    <admin-objects>
                        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue"

jndi-name="java:jboss/queue/MyActiveMQQueue" use-java-context="true" pool-name="MyActiveMQQueue">
                            <config-property name="PhysicalName">
                                QueuePhysicalName
                            </config-property>
                        </admin-object>
                    </admin-objects>
                </resource-adapter>
            </resource-adapters>
        </subsystem>
{code}
test_amq.jar
This jar contains two group of MDBs. Three MDBs (AMQTestMDB*) are consumer for AMQ queue QueuePhysicalName
(defined in standalone-ha.xml). The other three MDBs (HornetQTestMDB*) are consumer for HornetQ
(default implementation) queue queue/HornetQMDBQueue (defined in war's hornetq-jms.xml).

The onMessage method call following simple logic

public void handleMessage(String id, TextMessage tm) {
        try {
            String logMsg = id + " Testing MDB : " + tm.getText()
                    + ", " + this.toString();
            System.out.println(logMsg);
            if (tm.getText().startsWith("Hello")) {
                // simulate heavy processing of first message
                long sec = 5;
                System.out.println(logMsg + " - Heavy processing for " + sec
                        + " seconds. " + tm.getText() + ", " + this.toString());
                Random r = new Random();
                for (long stop = System.nanoTime()
                        + TimeUnit.SECONDS.toNanos(5); stop > System.nanoTime();) {
                    r.nextDouble();
                }
                System.out.println(logMsg + " Done!");
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

test_amq_web.war
This war contains clients for the test queues. /test tests AMQ. /test2 tests HornetQ. Following
is the logic used:

    for (int i = 0; i < 5; i++) {
        msg.setText("Hello " + i + "[" + groupName + "]");
        msg2.setText("World " + i + "[" + groupName + "]");

        if (useGroup) {
            msg.setStringProperty("JMSXGroupID", groupName);
            msg2.setStringProperty("JMSXGroupID", groupName);
        }

        qsender.send(msg);
        qsender.send(msg2);
    }

Each servlet creates 5 threads. Each thread creates two text messages and sends it out 5 times.

Executing test cases
- ActiveMQ test: http://localhost:8080/test_amq_web/test
- ActiveMQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test?group=false
- HornetQ test: http://localhost:8080/test_amq_web/test2
- HornetQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test2?group=false

Results:

When invoking the test_amq_web/test we can see that messages using the same message group
are not being processed sequentially. In the logs we get messages in this order:

15:00:47,031 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730
15:00:47,032 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730 - Heavy processing for 5 seconds. Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730
...
15:00:47,066 INFO  [stdout] (default-threads - 12) Consume-AMQ3 Testing MDB : World 0[Group-1],
com.redhat.gss.test.HandleMessage@5418f143
...
15:00:52,035 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1],
com.redhat.gss.test.HandleMessage@53e59730 Done!

However when invoking test_amq_web/test2 (which uses HornetQ) we can see that the message
are being processed sequentially:

15:08:35,740 INFO  [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1
Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
15:08:35,744 INFO  [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1
Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 - Heavy processing
for 5 seconds. Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
...
15:08:40,784 INFO  [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3
Testing MDB : Hello 0[Group-3], com.redhat.gss.test.HandleMessage@bce1791 Done!
15:08:40,788 INFO  [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3
Testing MDB : World 0[Group-3], com.redhat.gss.test.HandleMessage@5e8ace0e
                  
> The Resource Adapter ignores the JMSXGroupID when dispatching to MDBs
> ---------------------------------------------------------------------
>
>                 Key: AMQ-1126
>                 URL: https://issues.apache.org/jira/browse/AMQ-1126
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JCA Container
>    Affects Versions: 4.0.1
>         Environment: Java 1.4.2_08
> JBoss 4.0.4
> ActiveMQ 4.0.1
>            Reporter: John Robinson
>             Fix For: NEEDS_REVIEWED
>
>         Attachments: amq_reproducer.zip, msg-group-test.zip
>
>
> Integrate AMQ into JBoss using the data source, and resource adapter.  Create an outbound
queue and an MDB with a pool size of 100.  Dispatch several messages to the outbound queue,
setting the JMSXGroupID property on the message to be the same value each time.  In the MDB's
onMessage method print out the MDBs toString (don't override toString) and you should see
something that looks like:
> OutQueueProcessorBean@19a7266
> Observe two things:
> a) Many messages are processed in parallel
> b) Many different values will occur after the @ in the above message, denoting that more
than on MDB instance is being handed messages.
> The correct behavior would be to dispatch messages with the same group id to the same
MDB instance in sequence.  This would allow messages from different groups to be processed
in parallel, but messages in any one group would be processed serially, in the order in which
they were placed into the queue.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message