activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Todd Baert (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (ARTEMIS-1549) AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()
Date Sat, 09 Dec 2017 14:57:00 GMT

    [ https://issues.apache.org/jira/browse/ARTEMIS-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284761#comment-16284761
] 

Todd Baert edited comment on ARTEMIS-1549 at 12/9/17 2:56 PM:
--------------------------------------------------------------

It does seem to be the same... Is the underlying problem that the address is null when the
amqp message is wrapped in a CORE message to traverse the bridge?

Anyway I'll close this as a dupe.


was (Author: toddbaert):
It does seem to be the same... Is the underlying problem that the address is null when the
amqp message is wrapped in a CORE message to traverse the bridge?

> AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()
> -----------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-1549
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1549
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: AMQP, Broker
>    Affects Versions: 2.2.0, 2.3.0, 2.4.0
>         Environment: AMQP .NET lite client, .NET Core runtime 2.0, connecting with brokers
on local machine (fedora 26)
>            Reporter: Todd Baert
>
> I have setup a cluster of 2 brokers, using a simple static cluster configuration (see
below). Sending a CORE message to broker1, and consuming that message from broker2 works as
expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE
in broker2:
> {code:none}
> 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception:
java.lang.NullPointerException
> 	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613)
[artemis-amqp-protocol-2.3.0.jar:]
> 	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64)
[artemis-amqp-protocol-2.3.0.jar:]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348)
[artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309)
[artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302)
[artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690)
[artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290)
[artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53)
[artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
[artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
[artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53)
[artemis-commons-2.3.0.jar:2.3.0]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
> 	at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
> {code}
> To be clear, the NPE exists on the broker to which the receiver is attached, every time
a message is SENT by the producer (which is attached to the other broker).
> Attempting to send/receive the AMQP messages on the same cluster member works as expected.
> Here is some client code that demonstrates the issue:
> {code:java}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace Test
> {
>     class Program
>     {
>         static void Main(string[] args)
>         {
>             string url1 = "amqp://localhost:5672";
>             string url2 = "amqp://localhost:5673";
>             String ADDRESS = "orders";
>             Connection connection1 = new Connection(new Address(url1));
>             Session session1 = new Session(connection1);
>             ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS),
null);
>             Connection connection2 = new Connection(new Address(url2));
>             Session session2 = new Session(connection2);
>             SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
>             receiver.Start(300, (r,  m) => {
>                 r.Accept(m);
>                 Console.WriteLine("Got message: " + m.Body);
>             });
>         
>             Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
>             outMessage.Header = new Header();
>             outMessage.Header.Durable = true;                    
>             sender.Send(outMessage);
>             Thread.CurrentThread.Join();
>         }
>         private static Source CreateSharedDurableSubscriberSource(String address)
>         {
>             Source source = new Source();
>             source.Address = address;
>             source.ExpiryPolicy = new Symbol("never");
>             
>             source.Durable = 2;
>             source.Capabilities = new Symbol[]{"topic", "shared", "global"};
>             source.DistributionMode = new Symbol("copy");
>             return source;
>         }
>     }
> }
> {code}
> Here is the cluster config from broker1, broker2 is configured accordingly
> {code:xml}
>       <acceptors>
>          <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
>          <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
>       </acceptors>
>       <connectors>
>         <connector name="broker1-connector">tcp://localhost:61616</connector>
>         <connector name="broker2-connector">tcp://localhost:61617</connector>
>       </connectors>
>       <cluster-user>todd</cluster-user>
>       <cluster-password>password</cluster-password>      
>       <cluster-connections>
>         <cluster-connection name="preprod">
>           <connector-ref>broker1-connector</connector-ref>
>           <retry-interval>500</retry-interval>
>           <use-duplicate-detection>true</use-duplicate-detection>
>           <message-load-balancing>ON_DEMAND</message-load-balancing>    
     
>           <max-hops>1</max-hops>
>           <static-connectors allow-direct-connections-only="true">
>               <connector-ref>broker2-connector</connector-ref>
>           </static-connectors>
>         </cluster-connection>
>       </cluster-connections>
> {code}
> message-load-balancing has no effect on this issue, redistributiuon-delay is set to 0
for the address in question.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message