activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fugitt, Jesse" <jfug...@informatica.com>
Subject problem when MessageStore updateMessage function throws an exception
Date Thu, 04 Sep 2014 20:26:56 GMT
When using the new option persisteJMSRedelivered (to ensure the redelivered flag is set correctly
on potentially duplicate messages that are re-dispatched by the broker even after a restart):
<policyEntry queue=">" persistJMSRedelivered="true"></policyEntry>

there is still a case where a message can be re-sent and will not be marked as redelivered.
 I can open a JIRA and probably create a unit test but it is pretty clear from the pasted
code below where the exception is getting swallowed.  Would the preferred fix be to update
the broker interface and make preProcessDispatch throw an IOException or would it be better
to add a new field to the MessageDispatch class to indicate an exception occurred and leave
the interface alone?

The specific case when this can happen is when a MessageStore returns an exception during
the updateMessage call, which then gets swallowed (and an ERROR logged) and still allows the
message to be dispatched to the consumer.  The exception seems like it should actually propagate
out of the preProcessDispatch function in RegionBroker as shown below, but this would require
changing the Broker interface and making the void preProcessDispatch function throw an IOException.

//RegionBroker.java
    @Override
    public void preProcessDispatch(MessageDispatch messageDispatch) {
        Message message = messageDispatch.getMessage();
        if (message != null) {
            long endTime = System.currentTimeMillis();
            message.setBrokerOutTime(endTime);
            if (getBrokerService().isEnableStatistics()) {
                long totalTime = endTime - message.getBrokerInTime();
                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
            }
            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered()
&& !message.isRedelivered() && message.isPersistent()) {
                final int originalValue = message.getRedeliveryCounter();
                message.incrementRedeliveryCounter();
                try {
                    ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
                } catch (IOException error) {
                    LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(),
message.getDestination(), error);
                } finally {
                    message.setRedeliveryCounter(originalValue);
                }
            }
        }
    }

//TransportConnection.java
    protected void processDispatch(Command command) throws IOException {
        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ?
command : null);
        try {
            if (!stopping.get()) {
                if (messageDispatch != null) {
                    broker.preProcessDispatch(messageDispatch);
                }
                dispatch(command);  //This code will dispatch the message whether or not the
updateMessage function actually worked
            }
        ...



I wanted to get input on this issue before proceeding further with it.

Thanks,
Jesse

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message