activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (ARTEMIS-883) Fix OpenWire ProducerFlowControlTest Regression
Date Thu, 15 Dec 2016 00:57:58 GMT

    [ https://issues.apache.org/jira/browse/ARTEMIS-883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749978#comment-15749978
] 

ASF GitHub Bot commented on ARTEMIS-883:
----------------------------------------

Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92524664
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(),
messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL
&& pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()),
actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue())
{
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent
Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue())
{
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination:
" + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    Thanks guys I'll come back to it later today. 



> Fix OpenWire ProducerFlowControlTest Regression
> -----------------------------------------------
>
>                 Key: ARTEMIS-883
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-883
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: OpenWire
>    Affects Versions: 1.5.1
>            Reporter: Howard Gao
>            Assignee: Howard Gao
>             Fix For: 1.5.next
>
>
> ProducerFlowControlTest fails due to regression



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message