activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuconic <...@git.apache.org>
Subject [GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...
Date Tue, 13 Dec 2016 00:36:49 GMT
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92074326
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(),
messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL
&& pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()),
actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue())
{
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent
Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE,
RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue())
{
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination:
" + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    I strongly disagree.. we are not supposed to block...
    
    
    when you block a Thread, you are also blocking all the sessions on the connection.. so
just use setAutoRead(false) instead of blocking a thread.
    
    We have a neat non-blocking architecture in place. We have to avoid these kind of things.
Especially that the producer might be blocked for a long time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message