activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arthur Naseef (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-3166) client calls to createProducer() and send() successful even though BrokerFilter methods throw exceptions
Date Mon, 03 Mar 2014 04:57:22 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13917743#comment-13917743
] 

Arthur Naseef commented on AMQ-3166:
------------------------------------

I tried forcing a rollback on the transaction and that causes the commit to fail, but the
exception seems misleading to me, stating transaction has not been started:

{noformat}
javax.jms.JMSException: Transaction 'TX:ID:Arthur-Naseefs-MacBook-Pro.local-58321-1393821595757-5:1:1'
has not been started.
{noformat}

Here's my patch:
{noformat}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/mai
index 65d044b..fc4a674 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -306,6 +306,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                 responseRequired = false;
             }
 
+            // AMQ-3166: remember and propogate transaction failures.
+            serviceTransactionExceptions(command, e);
+
             if (responseRequired) {
                 response = new ExceptionResponse(e);
             } else {
@@ -1608,4 +1611,44 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
     public WireFormatInfo getRemoteWireFormatInfo() {
         return wireFormatInfo;
     }
+
+    protected void  serviceTransactionExceptions(Command command, Throwable thrown) {
+        if ( command instanceof Message ) {
+            Message msg = (Message) command;
+
+            if ( msg.isInTransaction() ) {
+                LOG.debug("marking transaction {} failed on message {} due to {}", msg.getTransactionId(),
+                          msg.getMessageId(), thrown.getMessage());
+                this.markTransactionFailure(msg.getConnection().getConnectionInfo().getConnectionId(),
+                                            msg.getTransactionId(), thrown);
+            }
+        } else if ( command instanceof MessageAck ) {
+            MessageAck ack = (MessageAck) command;
+
+            if ( ack.isInTransaction() ) {
+                LOG.debug("marking transaction {} failed on ack for messages {}..{} due to
{}", ack.getTransactionId(),
+                          ack.getFirstMessageId(), ack.getLastMessageId(), thrown.getMessage());
+                this.markTransactionFailure(ack.getConsumerId().getParentId().getParentId(),
ack.getTransactionId(),
+                                            thrown);
+            }
+        }
+
+        // TBD: any other commands?  MessagePull doesn't contain transaction info, and I
suspect that it doesn't need
+        // it since it must be synchronous.
+    }
+
+    protected void  markTransactionFailure(ConnectionId connId, TransactionId tId, Throwable
thrown) {
+        TransactionInfo rollback = new TransactionInfo();
+        rollback.setConnectionId(connId);
+        rollback.setTransactionId(tId);
+        rollback.setType(TransactionInfo.ROLLBACK);
+
+        try {
+            // Reuse the rollback existing logic.
+            this.processRollbackTransaction(rollback);
+        } catch ( Throwable err ) {
+            LOG.warn("failed to force rollack on transaction {} after initial failure {}",
tId, thrown.getMessage(),
+                     err);
+        }
+    }
 }
{noformat}

I'm looking at adding a "failed" status to TransactionState - that appears to be a feasible
approach, and seems more correct as well.

> client calls to createProducer() and send() successful even though BrokerFilter methods
throw exceptions
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3166
>                 URL: https://issues.apache.org/jira/browse/AMQ-3166
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, JMS client
>    Affects Versions: 5.4.2, 5.5.0
>            Reporter: Arthur Naseef
>            Assignee: Arthur Naseef
>         Attachments: AMQ3166Test.java, AMQ3166Test.java, FailedTransactionTracking.java,
FailedTransactionTrackingPlugin.java
>
>
> Client calls to createProducer() always return without an error even though a BrokerFilter's
addProducer() method throws an exception on the request. In contrast, createConsumer() throws
an exception, as expected, when BrokerFilter's addConsumer() throws an exception.
> Clients using transacted sessions always return successfully from send() when a BrokerFilter's
send() method throws an exception.
> Below is a broker configuration file using <authorizationPlugin> to illustrate
the problem.
> To reproduce the problem With this configuration, a test client only needs to connect
with user = "user" and password = "password", and then attempt to produce messages with a
transacted session to any queue other than ABC (e.g. DEF).
> Tracing the cause of the issue has lead to finding that the client code for creating
a producer uses an Async send for the producer information.  The analogous code for consumers
uses a Sync send.
> I will work on a patch.  It would be very helpful to have feedback on the operation of
the bus and the best way to resolve this problem.  Based on my research, it seems that createProducer()
should be using a Sync send in place of the Async one.  Not yet sure about send().  Another
possibility is to move the security operations to earlier in the internal broker flow.
> === SAMPLE BROKER XML ===
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     <broker xmlns="http://activemq.apache.org/schema/core"
>             brokerName="localhost"
>             dataDirectory="${activemq.base}/data"
>             destroyApplicationContextOnStop="true" >
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.base}/data/kahadb"/>
>         </persistenceAdapter>
>         
>         <plugins>
>           <simpleAuthenticationPlugin anonymousAccessAllowed="true">
>               <users>
>                   <authenticationUser username="user" password="password"
>                       groups="users"/>
>               </users>
>           </simpleAuthenticationPlugin>
>           <authorizationPlugin>
>               <map>
>                   <authorizationMap>
>                     <authorizationEntries>
>                       <authorizationEntry queue="ABC" read="users" write="users" admin="users"
/>
>                       <authorizationEntry topic="ActiveMQ.Advisory.>" read="users"
write="users" admin="users" />
>                     </authorizationEntries>
>                   </authorizationMap>
>               </map>
>           </authorizationPlugin>
>         </plugins>
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>         </transportConnectors>
>     </broker>
> </beans>



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message