Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 64734200BEA for ; Tue, 13 Dec 2016 01:19:45 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5B14D160B2A; Tue, 13 Dec 2016 00:19:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7AE14160B22 for ; Tue, 13 Dec 2016 01:19:44 +0100 (CET) Received: (qmail 12128 invoked by uid 500); 13 Dec 2016 00:19:43 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 12113 invoked by uid 99); 13 Dec 2016 00:19:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Dec 2016 00:19:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2E8EAE09B3; Tue, 13 Dec 2016 00:19:43 +0000 (UTC) From: gaohoward To: dev@activemq.apache.org Reply-To: dev@activemq.apache.org References: In-Reply-To: Subject: [GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon... Content-Type: text/plain Message-Id: <20161213001943.2E8EAE09B3@git1-us-west.apache.org> Date: Tue, 13 Dec 2016 00:19:43 +0000 (UTC) archived-at: Tue, 13 Dec 2016 00:19:45 -0000 Github user gaohoward commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/907#discussion_r92072316 --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java --- @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo, originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); } - Runnable runnable; - - if (sendProducerAck) { - runnable = new Runnable() { - @Override - public void run() { - try { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - connection.dispatchSync(ack); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); - } - - } - }; - } else { - final Connection transportConnection = connection.getTransportConnection(); - - if (transportConnection == null) { - // I don't think this could happen, but just in case, avoiding races - runnable = null; - } else { - runnable = new Runnable() { - @Override - public void run() { - transportConnection.setAutoRead(true); - } - }; - } - } - - internalSend(actualDestinations, originalCoreMsg, runnable); - } + boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired(); - private void internalSend(ActiveMQDestination[] actualDestinations, - ServerMessage originalCoreMsg, - final Runnable onComplete) throws Exception { + final AtomicInteger count = new AtomicInteger(actualDestinations.length); - Runnable runToUse; + final Exception[] anyException = new Exception[] {null}; - if (actualDestinations.length <= 1 || onComplete == null) { - // if onComplete is null, this will be null ;) - runToUse = onComplete; - } else { - final AtomicInteger count = new AtomicInteger(actualDestinations.length); - runToUse = new Runnable() { - @Override - public void run() { - if (count.decrementAndGet() == 0) { - onComplete.run(); - } - } - }; + if (shouldBlockProducer) { + connection.getContext().setDontSendReponse(true); } - SimpleString[] addresses = new SimpleString[actualDestinations.length]; - PagingStore[] pagingStores = new PagingStore[actualDestinations.length]; - - // We fillup addresses, pagingStores and we will throw failure if that's the case for (int i = 0; i < actualDestinations.length; i++) { ActiveMQDestination dest = actualDestinations[i]; - addresses[i] = new SimpleString(dest.getPhysicalName()); - pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]); - if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) { - throw new ResourceAllocationException("Queue is full"); - } - } - - for (int i = 0; i < actualDestinations.length; i++) { + SimpleString address = new SimpleString(dest.getPhysicalName()); + PagingStore store = server.getPagingManager().getPageStore(address); ServerMessage coreMsg = originalCoreMsg.copy(); + coreMsg.setAddress(address); - coreMsg.setAddress(addresses[i]); - - PagingStore store = pagingStores[i]; - - if (store.isFull()) { - connection.getTransportConnection().setAutoRead(false); - } + if (shouldBlockProducer) { - if (actualDestinations[i].isQueue()) { - checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); - } + if (!store.checkMemory(() -> { + try { + RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary()); - if (actualDestinations[i].isQueue()) { - coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); + if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) { + throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest); + } + } catch (Exception e) { + if (anyException[0] == null) { + anyException[0] = e; + } + } + if (count.decrementAndGet() == 0) { + if (anyException[0] != null) { + ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]); + connection.sendException(anyException[0]); + } else { + if (sendProducerAck) { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } else { + try { + //maybe use this: connection.getContext().setDontSendReponse(false); + Response response = new Response(); + response.setCorrelationId(messageSend.getCommandId()); + connection.dispatchAsync(response); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } + } + } + })) { + this.connection.getContext().setDontSendReponse(false); + throw new ResourceAllocationException("Queue is full " + address); + } } else { - coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); - } - RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary()); + //non-persistent messages goes here, by default we block on the calling thread + //which means the whole connection is blocked when address is full. + final CountDownLatch latch = new CountDownLatch(1); + if (!store.checkMemory(() -> { + latch.countDown(); + })) { + throw new ResourceAllocationException("Queue is full " + address); + } - if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) { - throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]); - } + latch.await(); --- End diff -- another issue with setAutoRead() I forgot to mention is that the client producer seems to have some TCP buffer when auto read is false. So the producer seems always can write some more bytes into the TCP layer without knowing that they are actually stay at client side buffer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---