Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 34505 invoked from network); 8 Jul 2010 14:24:58 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 8 Jul 2010 14:24:58 -0000 Received: (qmail 10860 invoked by uid 500); 8 Jul 2010 14:24:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 10840 invoked by uid 500); 8 Jul 2010 14:24:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 10833 invoked by uid 99); 8 Jul 2010 14:24:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jul 2010 14:24:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,T_FRT_BELOW2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jul 2010 14:24:49 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DADB723888D1; Thu, 8 Jul 2010 14:23:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961783 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/ope... Date: Thu, 08 Jul 2010 14:23:23 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100708142323.DADB723888D1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Thu Jul 8 14:23:21 2010 New Revision: 961783 URL: http://svn.apache.org/viewvc?rev=961783&view=rev Log: resolve https://issues.apache.org/activemq/browse/AMQ-2800, https://issues.apache.org/activemq/browse/AMQ-2542, https://issues.apache.org/activemq/browse/AMQ-2803 - implement duplicate checker in transport for a failover: reconnect, uses last seqid from store. iimplemented for kahaDB and JDBC Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java activemq/trunk/activemq-core/src/main/proto/journal-data.proto activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java Thu Jul 8 14:23:21 2010 @@ -16,35 +16,23 @@ */ package org.apache.activemq; -import javax.jms.JMSException; -import javax.jms.Message; - -import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.util.BitArrayBin; -import org.apache.activemq.util.IdGenerator; -import org.apache.activemq.util.LRUCache; /** * Provides basic audit functions for Messages * * @version $Revision: 1.1.1.1 $ */ -public class ActiveMQMessageAudit { +public class ActiveMQMessageAudit extends ActiveMQMessageAuditNoSync { - public static final int DEFAULT_WINDOW_SIZE = 2048; - public static final int MAXIMUM_PRODUCER_COUNT = 64; - private int auditDepth; - private int maximumNumberOfProducersToTrack; - private LRUCache map; + private static final long serialVersionUID = 1L; /** * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = * 64 */ public ActiveMQMessageAudit() { - this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); + super(); } /** @@ -55,198 +43,41 @@ public class ActiveMQMessageAudit { * the system */ public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) { - this.auditDepth = auditDepth; - this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; - this.map = new LRUCache(0, maximumNumberOfProducersToTrack, 0.75f, true); + super(auditDepth, maximumNumberOfProducersToTrack); } - /** - * @return the auditDepth - */ - public int getAuditDepth() { - return auditDepth; - } - - /** - * @param auditDepth the auditDepth to set - */ - public void setAuditDepth(int auditDepth) { - this.auditDepth = auditDepth; - } - - /** - * @return the maximumNumberOfProducersToTrack - */ - public int getMaximumNumberOfProducersToTrack() { - return maximumNumberOfProducersToTrack; - } - - /** - * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set - */ - public void setMaximumNumberOfProducersToTrack( - int maximumNumberOfProducersToTrack) { - this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; - this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); - } - - /** - * Checks if this message has been seen before - * - * @param message - * @return true if the message is a duplicate - * @throws JMSException - */ - public boolean isDuplicate(Message message) throws JMSException { - return isDuplicate(message.getJMSMessageID()); - } - - /** - * checks whether this messageId has been seen before and adds this - * messageId to the list - * - * @param id - * @return true if the message is a duplicate - */ - public synchronized boolean isDuplicate(String id) { - boolean answer = false; - String seed = IdGenerator.getSeedFromId(id); - if (seed != null) { - BitArrayBin bab = map.get(seed); - if (bab == null) { - bab = new BitArrayBin(auditDepth); - map.put(seed, bab); - } - long index = IdGenerator.getSequenceFromId(id); - if (index >= 0) { - answer = bab.setBit(index, true); - } + @Override + public boolean isDuplicate(String id) { + synchronized (this) { + return super.isDuplicate(id); } - return answer; } - /** - * Checks if this message has been seen before - * - * @param message - * @return true if the message is a duplicate - */ - public boolean isDuplicate(final MessageReference message) { - MessageId id = message.getMessageId(); - return isDuplicate(id); - } - - /** - * Checks if this messageId has been seen before - * - * @param id - * @return true if the message is a duplicate - */ - public synchronized boolean isDuplicate(final MessageId id) { - boolean answer = false; - - if (id != null) { - ProducerId pid = id.getProducerId(); - if (pid != null) { - BitArrayBin bab = map.get(pid); - if (bab == null) { - bab = new BitArrayBin(auditDepth); - map.put(pid, bab); - } - answer = bab.setBit(id.getProducerSequenceId(), true); - } + @Override + public boolean isDuplicate(final MessageId id) { + synchronized (this) { + return super.isDuplicate(id); } - return answer; } - /** - * mark this message as being received - * - * @param message - */ - public void rollback(final MessageReference message) { - MessageId id = message.getMessageId(); - rollback(id); - } - - /** - * mark this message as being received - * - * @param id - */ - public synchronized void rollback(final MessageId id) { - if (id != null) { - ProducerId pid = id.getProducerId(); - if (pid != null) { - BitArrayBin bab = map.get(pid); - if (bab != null) { - bab.setBit(id.getProducerSequenceId(), false); - } - } + @Override + public void rollback(final MessageId id) { + synchronized (this) { + super.rollback(id); } } - /** - * Check the message is in order - * @param msg - * @return - * @throws JMSException - */ - public boolean isInOrder(Message msg) throws JMSException { - return isInOrder(msg.getJMSMessageID()); - } - - /** - * Check the message id is in order - * @param id - * @return - */ - public synchronized boolean isInOrder(final String id) { - boolean answer = true; - - if (id != null) { - String seed = IdGenerator.getSeedFromId(id); - if (seed != null) { - BitArrayBin bab = map.get(seed); - if (bab != null) { - long index = IdGenerator.getSequenceFromId(id); - answer = bab.isInOrder(index); - } - - } + @Override + public boolean isInOrder(final String id) { + synchronized (this) { + return super.isInOrder(id); } - return answer; } - /** - * Check the MessageId is in order - * @param message - * @return - */ - public synchronized boolean isInOrder(final MessageReference message) { - return isInOrder(message.getMessageId()); - } - - /** - * Check the MessageId is in order - * @param id - * @return - */ - public synchronized boolean isInOrder(final MessageId id) { - boolean answer = false; - - if (id != null) { - ProducerId pid = id.getProducerId(); - if (pid != null) { - BitArrayBin bab = map.get(pid); - if (bab == null) { - bab = new BitArrayBin(auditDepth); - map.put(pid, bab); - } - answer = bab.isInOrder(id.getProducerSequenceId()); - - } + @Override + public boolean isInOrder(final MessageId id) { + synchronized (this) { + return isInOrder(id); } - return answer; } } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=961783&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java Thu Jul 8 14:23:21 2010 @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.util.BitArrayBin; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LRUCache; + +/** + * Provides basic audit functions for Messages without sync + * + * @version $Revision$ + */ +public class ActiveMQMessageAuditNoSync implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final int DEFAULT_WINDOW_SIZE = 2048; + public static final int MAXIMUM_PRODUCER_COUNT = 64; + private int auditDepth; + private int maximumNumberOfProducersToTrack; + private LRUCache map; + + /** + * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = + * 64 + */ + public ActiveMQMessageAuditNoSync() { + this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); + } + + /** + * Construct a MessageAudit + * + * @param auditDepth range of ids to track + * @param maximumNumberOfProducersToTrack number of producers expected in + * the system + */ + public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) { + this.auditDepth = auditDepth; + this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; + this.map = new LRUCache(0, maximumNumberOfProducersToTrack, 0.75f, true); + } + + /** + * @return the auditDepth + */ + public int getAuditDepth() { + return auditDepth; + } + + /** + * @param auditDepth the auditDepth to set + */ + public void setAuditDepth(int auditDepth) { + this.auditDepth = auditDepth; + } + + /** + * @return the maximumNumberOfProducersToTrack + */ + public int getMaximumNumberOfProducersToTrack() { + return maximumNumberOfProducersToTrack; + } + + /** + * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set + */ + public void setMaximumNumberOfProducersToTrack( + int maximumNumberOfProducersToTrack) { + this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; + this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); + } + + /** + * Checks if this message has been seen before + * + * @param message + * @return true if the message is a duplicate + * @throws JMSException + */ + public boolean isDuplicate(Message message) throws JMSException { + return isDuplicate(message.getJMSMessageID()); + } + + /** + * checks whether this messageId has been seen before and adds this + * messageId to the list + * + * @param id + * @return true if the message is a duplicate + */ + public boolean isDuplicate(String id) { + boolean answer = false; + String seed = IdGenerator.getSeedFromId(id); + if (seed != null) { + BitArrayBin bab = map.get(seed); + if (bab == null) { + bab = new BitArrayBin(auditDepth); + map.put(seed, bab); + } + long index = IdGenerator.getSequenceFromId(id); + if (index >= 0) { + answer = bab.setBit(index, true); + } + } + return answer; + } + + /** + * Checks if this message has been seen before + * + * @param message + * @return true if the message is a duplicate + */ + public boolean isDuplicate(final MessageReference message) { + MessageId id = message.getMessageId(); + return isDuplicate(id); + } + + /** + * Checks if this messageId has been seen before + * + * @param id + * @return true if the message is a duplicate + */ + public boolean isDuplicate(final MessageId id) { + boolean answer = false; + + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab == null) { + bab = new BitArrayBin(auditDepth); + map.put(pid, bab); + } + answer = bab.setBit(id.getProducerSequenceId(), true); + } + } + return answer; + } + + /** + * mark this message as being received + * + * @param message + */ + public void rollback(final MessageReference message) { + MessageId id = message.getMessageId(); + rollback(id); + } + + /** + * mark this message as being received + * + * @param id + */ + public void rollback(final MessageId id) { + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab != null) { + bab.setBit(id.getProducerSequenceId(), false); + } + } + } + } + + /** + * Check the message is in order + * @param msg + * @return + * @throws JMSException + */ + public boolean isInOrder(Message msg) throws JMSException { + return isInOrder(msg.getJMSMessageID()); + } + + /** + * Check the message id is in order + * @param id + * @return + */ + public boolean isInOrder(final String id) { + boolean answer = true; + + if (id != null) { + String seed = IdGenerator.getSeedFromId(id); + if (seed != null) { + BitArrayBin bab = map.get(seed); + if (bab != null) { + long index = IdGenerator.getSequenceFromId(id); + answer = bab.isInOrder(index); + } + + } + } + return answer; + } + + /** + * Check the MessageId is in order + * @param message + * @return + */ + public boolean isInOrder(final MessageReference message) { + return isInOrder(message.getMessageId()); + } + + /** + * Check the MessageId is in order + * @param id + * @return + */ + public boolean isInOrder(final MessageId id) { + boolean answer = false; + + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab == null) { + bab = new BitArrayBin(auditDepth); + map.put(pid, bab); + } + answer = bab.isInOrder(id.getProducerSequenceId()); + + } + } + return answer; + } + + public long getLastSeqId(ProducerId id) { + long result = -1; + BitArrayBin bab = map.get(id.toString() + ":"); + if (bab != null) { + result = bab.getLastSetIndex(); + } + return result; + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java ------------------------------------------------------------------------------ svn:executable = * Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Thu Jul 8 14:23:21 2010 @@ -47,7 +47,7 @@ public class ConnectionContext { private ConnectionId connectionId; private String clientId; private String userName; - private boolean haAware; + private boolean reconnect; private WireFormatInfo wireFormatInfo; private Object longTermStoreContext; private boolean producerFlowControl = true; @@ -86,7 +86,7 @@ public class ConnectionContext { rc.connectionId = this.connectionId; rc.clientId = this.clientId; rc.userName = this.userName; - rc.haAware = this.haAware; + rc.reconnect = this.reconnect; rc.wireFormatInfo = this.wireFormatInfo; rc.longTermStoreContext = this.longTermStoreContext; rc.producerFlowControl = this.producerFlowControl; @@ -212,12 +212,12 @@ public class ConnectionContext { this.clientId = clientId; } - public boolean isHaAware() { - return haAware; + public boolean isReconnect() { + return reconnect; } - public void setHaAware(boolean haAware) { - this.haAware = haAware; + public void setReconnect(boolean reconnect) { + this.reconnect = reconnect; } public WireFormatInfo getWireFormatInfo() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Jul 8 14:23:21 2010 @@ -18,7 +18,10 @@ package org.apache.activemq.broker; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Region; +import org.apache.activemq.command.Message; import org.apache.activemq.state.ProducerState; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Holds internal state in the broker for a MessageProducer @@ -27,11 +30,13 @@ import org.apache.activemq.state.Produce */ public class ProducerBrokerExchange { + private static final Log LOG = LogFactory.getLog(ProducerBrokerExchange.class); private ConnectionContext connectionContext; private Destination regionDestination; private Region region; private ProducerState producerState; private boolean mutable = true; + private long lastSendSequenceNumber = -1; public ProducerBrokerExchange() { } @@ -117,4 +122,25 @@ public class ProducerBrokerExchange { this.producerState = producerState; } + /** + * Enforce duplicate suppression using info from persistence adapter + * @param messageSend + * @return false if message should be ignored as a duplicate + */ + public boolean canDispatch(Message messageSend) { + boolean canDispatch = true; + if (lastSendSequenceNumber > 0) { + if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) { + canDispatch = false; + LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" + + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber); + } + } + return canDispatch; + } + + public void setLastStoredSequenceId(long l) { + lastSendSequenceNumber = l; + LOG.debug("last stored sequence id set: " + l); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jul 8 14:23:21 2010 @@ -453,7 +453,9 @@ public class TransportConnection impleme public Response processMessage(Message messageSend) throws Exception { ProducerId producerId = messageSend.getProducerId(); ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); - broker.send(producerExchange, messageSend); + if (producerExchange.canDispatch(messageSend)) { + broker.send(producerExchange, messageSend); + } return null; } @@ -680,6 +682,7 @@ public class TransportConnection impleme context.setTransactions(new ConcurrentHashMap()); context.setUserName(info.getUserName()); context.setWireFormatInfo(wireFormatInfo); + context.setReconnect(info.isFailoverReconnect()); this.manageable = info.isManageable(); state.setContext(context); state.setConnection(this); @@ -1249,13 +1252,16 @@ public class TransportConnection impleme } } - private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) { + private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { ProducerBrokerExchange result = producerExchanges.get(id); if (result == null) { synchronized (producerExchanges) { result = new ProducerBrokerExchange(); - TransportConnectionState state = lookupConnectionState(id); + TransportConnectionState state = lookupConnectionState(id); context = state.getContext(); + if (context.isReconnect()) { + result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); + } result.setConnectionContext(context); SessionState ss = state.getSessionState(id.getParentId()); if (ss != null) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Jul 8 14:23:21 2010 @@ -374,6 +374,7 @@ public abstract class AbstractRegion imp LOG.warn("Ack for non existent subscription, ack:" + ack); throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); } else { + LOG.debug("Ack for non existent subscription in recovery, ack:" + ack); return; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Thu Jul 8 14:23:21 2010 @@ -37,6 +37,7 @@ public class ConnectionInfo extends Base protected boolean clientMaster = true; protected boolean faultTolerant = false; protected transient Object transportContext; + private boolean failoverReconnect; public ConnectionInfo() { } @@ -216,4 +217,15 @@ public class ConnectionInfo extends Base this.faultTolerant = faultTolerant; } + /** + * @openwire:property version=6 cache=false + * @return failoverReconnect true if this is a reconnect + */ + public boolean isFailoverReconnect() { + return this.failoverReconnect; + } + + public void setFailoverReconnect(boolean failoverReconnect) { + this.failoverReconnect = failoverReconnect; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java Thu Jul 8 14:23:21 2010 @@ -86,6 +86,7 @@ public class ConnectionInfoMarshaller ex info.setManageable(bs.readBoolean()); info.setClientMaster(bs.readBoolean()); info.setFaultTolerant(bs.readBoolean()); + info.setFailoverReconnect(bs.readBoolean()); } @@ -107,6 +108,7 @@ public class ConnectionInfoMarshaller ex bs.writeBoolean(info.isManageable()); bs.writeBoolean(info.isClientMaster()); bs.writeBoolean(info.isFaultTolerant()); + bs.writeBoolean(info.isFailoverReconnect()); return rc + 0; } @@ -131,6 +133,7 @@ public class ConnectionInfoMarshaller ex bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); + bs.readBoolean(); } @@ -165,6 +168,7 @@ public class ConnectionInfoMarshaller ex info.setManageable(dataIn.readBoolean()); info.setClientMaster(dataIn.readBoolean()); info.setFaultTolerant(dataIn.readBoolean()); + info.setFailoverReconnect(dataIn.readBoolean()); } @@ -186,6 +190,7 @@ public class ConnectionInfoMarshaller ex dataOut.writeBoolean(info.isManageable()); dataOut.writeBoolean(info.isClientMaster()); dataOut.writeBoolean(info.isFaultTolerant()); + dataOut.writeBoolean(info.isFailoverReconnect()); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Jul 8 14:23:21 2010 @@ -140,6 +140,7 @@ public class ConnectionStateTracker exte // Restore the connections. for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) { ConnectionState connectionState = iter.next(); + connectionState.getInfo().setFailoverReconnect(true); if (LOG.isDebugEnabled()) { LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); } @@ -156,6 +157,9 @@ public class ConnectionStateTracker exte } //now flush messages for (Message msg:messageCache.values()) { + if (LOG.isDebugEnabled()) { + LOG.debug("message: " + msg.getMessageId()); + } transport.oneway(msg); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -26,6 +26,7 @@ import org.apache.activemq.broker.region import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.usage.SystemUsage; /** @@ -157,4 +158,13 @@ public interface PersistenceAdapter exte * @return disk space used in bytes of 0 if not implemented */ long size(); + + /** + * return the last stored producer sequenceId for this producer Id + * used to suppress duplicate sends on failover reconnect at the transport + * when a reconnect occurs + * @param id the producerId to find a sequenceId for + * @return the last stored sequence id or -1 if no suppression needed + */ + long getLastProducerSequenceId(ProducerId id) throws IOException; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -43,6 +43,7 @@ import org.apache.activemq.command.Journ import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.kaha.impl.async.AsyncDataManager; @@ -1117,4 +1118,10 @@ public class AMQPersistenceAdapter imple + ".DisableLocking", "false")); } + + + public long getLastProducerSequenceId(ProducerId id) { + // reference store send has adequate duplicate suppression + return -1; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Jul 8 14:23:21 2010 @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; /** @@ -60,7 +61,7 @@ public interface JDBCAdapter { SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException; - long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException; + long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException; void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException; @@ -85,4 +86,6 @@ public interface JDBCAdapter { long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; + + long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Jul 8 14:23:21 2010 @@ -285,7 +285,7 @@ public class JDBCMessageStore extends Ab long result = -1; TransactionContext c = persistenceAdapter.getTransactionContext(); try { - result = adapter.getStoreSequenceId(c, messageId); + result = adapter.getStoreSequenceId(c, destination, messageId); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -37,6 +37,7 @@ import org.apache.activemq.command.Activ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -93,7 +94,7 @@ public class JDBCPersistenceAdapter exte protected int maxProducersToAudit=1024; protected int maxAuditDepth=1000; - protected boolean enableAudit=true; + protected boolean enableAudit=false; protected int auditRecoveryDepth = 1024; protected ActiveMQMessageAudit audit; @@ -245,6 +246,19 @@ public class JDBCPersistenceAdapter exte c.close(); } } + + public long getLastProducerSequenceId(ProducerId id) throws IOException { + TransactionContext c = getTransactionContext(); + try { + return getAdapter().doGetLastProducerSequenceId(c, id); + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ", e); + throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); + } finally { + c.close(); + } + } + public void start() throws Exception { getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); @@ -699,6 +713,5 @@ public class JDBCPersistenceAdapter exte synchronized(sequenceGenerator) { return sequenceGenerator.getNextSequenceId(); } - } - + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Jul 8 14:23:21 2010 @@ -49,7 +49,7 @@ public class JDBCTopicMessageStore exten // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - long seq = adapter.getStoreSequenceId(c, messageId); + long seq = adapter.getStoreSequenceId(c, destination, messageId); adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Jul 8 14:23:21 2010 @@ -67,6 +67,7 @@ public class Statements { private String findNextMessagesStatement; private boolean useLockCreateWhereClause; private String findAllMessageIdsStatement; + private String lastProducerSequenceIdStatement; public String[] getCreateSchemaStatements() { if (createSchemaStatements == null) { @@ -128,7 +129,7 @@ public class Statements { public String getFindMessageSequenceIdStatement() { if (findMessageSequenceIdStatement == null) { findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName() - + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; + + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?"; } return findMessageSequenceIdStatement; } @@ -172,6 +173,15 @@ public class Statements { return findLastSequenceIdInMsgsStatement; } + public String getLastProducerSequenceIdStatement() { + if (lastProducerSequenceIdStatement == null) { + lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName() + + " WHERE MSGID_PROD=?"; + } + return lastProducerSequenceIdStatement; + } + + public String getFindLastSequenceIdInAcksStatement() { if (findLastSequenceIdInAcksStatement == null) { findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName(); @@ -656,4 +666,9 @@ public class Statements { this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement; } + + public void setLastProducerSequenceIdStatement(String lastProducerSequenceIdStatement) { + this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement; + } + } \ No newline at end of file Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Jul 8 14:23:21 2010 @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; @@ -246,13 +247,14 @@ public class DefaultJDBCAdapter implemen } } - public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException { + public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; try { s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); s.setString(1, messageID.getProducerId().toString()); s.setLong(2, messageID.getProducerSequenceId()); + s.setString(3, destination.getQualifiedName()); rs = s.executeQuery(); if (!rs.next()) { return 0; @@ -819,4 +821,23 @@ public class DefaultJDBCAdapter implemen * try { s.close(); } catch (Throwable ignore) {} } } */ + public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) + throws SQLException, IOException { + PreparedStatement s = null; + ResultSet rs = null; + try { + s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); + s.setString(1, id.toString()); + rs = s.executeQuery(); + long seq = -1; + if (rs.next()) { + seq = rs.getLong(1); + } + return seq; + } finally { + close(rs); + close(s); + } + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -50,6 +50,7 @@ import org.apache.activemq.command.Journ import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; @@ -745,4 +746,8 @@ public class JournalPersistenceAdapter i } } + public long getLastProducerSequenceId(ProducerId id) { + return -1; + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -32,6 +32,7 @@ import org.apache.activemq.command.Activ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.kaha.CommandMarshaller; import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; @@ -369,6 +370,11 @@ public class KahaPersistenceAdapter impl public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } + + public long getLastProducerSequenceId(ProducerId id) { + // reference store send has adequate duplicate suppression + return -1; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -26,6 +26,7 @@ import org.apache.activemq.broker.Connec import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; @@ -124,6 +125,10 @@ public class KahaDBPersistenceAdapter im return this.letter.getLastMessageBrokerSequenceId(); } + public long getLastProducerSequenceId(ProducerId id) throws IOException { + return this.letter.getLastProducerSequenceId(id); + } + /** * @param destination * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) @@ -209,6 +214,29 @@ public class KahaDBPersistenceAdapter im } /** + * Set the max number of producers (LRU cache) to track for duplicate sends + */ + public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { + this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); + } + + public int getMaxFailoverProducersToTrack() { + return this.letter.getMaxFailoverProducersToTrack(); + } + + /** + * set the audit window depth for duplicate suppression (should exceed the max transaction + * batch) + */ + public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { + this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); + } + + public int getFailoverProducersAuditDepth() { + return this.getFailoverProducersAuditDepth(); + } + + /** * Get the checkpointInterval * * @return the checkpointInterval @@ -477,4 +505,5 @@ public class KahaDBPersistenceAdapter im String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; return "KahaDBPersistenceAdapter[" + path + "]"; } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Jul 8 14:23:21 2010 @@ -48,6 +48,7 @@ import org.apache.activemq.command.Local import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; @@ -363,6 +364,7 @@ public class KahaDBStore extends Message org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); + } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { @@ -901,6 +903,15 @@ public class KahaDBStore extends Message public long getLastMessageBrokerSequenceId() throws IOException { return 0; } + + public long getLastProducerSequenceId(ProducerId id) { + indexLock.readLock().lock(); + try { + return metadata.producerSequenceIdTracker.getLastSeqId(id); + } finally { + indexLock.readLock().unlock(); + } + } public long size() { if (!isStarted()) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jul 8 14:23:21 2010 @@ -16,11 +16,15 @@ */ package org.apache.activemq.store.kahadb; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; @@ -35,22 +39,25 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaEntryType; import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; @@ -93,9 +100,9 @@ public class MessageDatabase extends Ser private static final Log LOG = LogFactory.getLog(MessageDatabase.class); private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; - static final int CLOSED_STATE = 1; - static final int OPEN_STATE = 2; - static final long NOT_ACKED = -1; + static final int CLOSED_STATE = 1; + static final int OPEN_STATE = 2; + static final long NOT_ACKED = -1; protected class Metadata { @@ -104,6 +111,8 @@ public class MessageDatabase extends Ser protected BTreeIndex destinations; protected Location lastUpdate; protected Location firstInProgressTransactionLocation; + protected Location producerSequenceIdTrackerLocation = null; + protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); public void read(DataInput is) throws IOException { state = is.readInt(); @@ -118,6 +127,14 @@ public class MessageDatabase extends Ser } else { firstInProgressTransactionLocation = null; } + try { + if (is.readBoolean()) { + producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); + } else { + producerSequenceIdTrackerLocation = null; + } + } catch (EOFException expectedOnUpgrade) { + } } public void write(DataOutput os) throws IOException { @@ -137,6 +154,13 @@ public class MessageDatabase extends Ser } else { os.writeBoolean(false); } + + if (producerSequenceIdTrackerLocation != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); + } else { + os.writeBoolean(false); + } } } @@ -154,7 +178,7 @@ public class MessageDatabase extends Ser protected PageFile pageFile; protected Journal journal; - protected Metadata metadata = new Metadata(); + protected Metadata metadata = new Metadata(); protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); @@ -171,7 +195,8 @@ public class MessageDatabase extends Ser int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; boolean enableIndexWriteAsync = false; - int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + protected AtomicBoolean opened = new AtomicBoolean(); private LockFile lockFile; @@ -381,15 +406,15 @@ public class MessageDatabase extends Ser private Location getFirstInProgressTxLocation() { Location l = null; synchronized (inflightTransactions) { - if (!inflightTransactions.isEmpty()) { - l = inflightTransactions.values().iterator().next().get(0).getLocation(); - } - if (!preparedTransactions.isEmpty()) { - Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); - if (l==null || t.compareTo(l) <= 0) { - l = t; + if (!inflightTransactions.isEmpty()) { + l = inflightTransactions.values().iterator().next().get(0).getLocation(); + } + if (!preparedTransactions.isEmpty()) { + Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); + if (l==null || t.compareTo(l) <= 0) { + l = t; + } } - } } return l; } @@ -407,21 +432,25 @@ public class MessageDatabase extends Ser try { long start = System.currentTimeMillis(); - Location recoveryPosition = getRecoveryPosition(); - if( recoveryPosition!=null ) { - int redoCounter = 0; - LOG.info("Recoverying from the journal ..."); - while (recoveryPosition != null) { - JournalCommand message = load(recoveryPosition); - metadata.lastUpdate = recoveryPosition; - process(message, recoveryPosition); - redoCounter++; - recoveryPosition = journal.getNextLocation(recoveryPosition); - } - long end = System.currentTimeMillis(); - LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + Location producerAuditPosition = recoverProducerAudit(); + Location lastIndoubtPosition = getRecoveryPosition(); + + Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); + + if (recoveryPosition != null) { + int redoCounter = 0; + LOG.info("Recoverying from the journal ..."); + while (recoveryPosition != null) { + JournalCommand message = load(recoveryPosition); + metadata.lastUpdate = recoveryPosition; + process(message, recoveryPosition, lastIndoubtPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + } + long end = System.currentTimeMillis(); + LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); } - + // We may have to undo some index updates. pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { @@ -433,7 +462,39 @@ public class MessageDatabase extends Ser } } - protected void recoverIndex(Transaction tx) throws IOException { + private Location minimum(Location producerAuditPosition, + Location lastIndoubtPosition) { + Location min = null; + if (producerAuditPosition != null) { + min = producerAuditPosition; + if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { + min = lastIndoubtPosition; + } + } else { + min = lastIndoubtPosition; + } + return min; + } + + private Location recoverProducerAudit() throws IOException { + if (metadata.producerSequenceIdTrackerLocation != null) { + KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); + try { + ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); + metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); + } catch (ClassNotFoundException cfe) { + IOException ioe = new IOException("Failed to read producerAudit: " + cfe); + ioe.initCause(cfe); + throw ioe; + } + return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); + } else { + // got no audit stored so got to recreate via replay from start of the journal + return journal.getNextLocation(null); + } + } + + protected void recoverIndex(Transaction tx) throws IOException { long start = System.currentTimeMillis(); // It is possible index updates got applied before the journal updates.. // in that case we need to removed references to messages that are not in the journal @@ -457,6 +518,7 @@ public class MessageDatabase extends Ser MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); sd.locationIndex.remove(tx, keys.location); sd.messageIdIndex.remove(tx, keys.messageId); + metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId)); undoCounter++; // TODO: do we need to modify the ack positions for the pub sub case? } @@ -588,7 +650,7 @@ public class MessageDatabase extends Ser while (nextRecoveryPosition != null) { lastRecoveryPosition = nextRecoveryPosition; metadata.lastUpdate = lastRecoveryPosition; - JournalCommand message = load(lastRecoveryPosition); + JournalCommand message = load(lastRecoveryPosition); process(message, lastRecoveryPosition); nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); } @@ -601,8 +663,8 @@ public class MessageDatabase extends Ser return metadata.lastUpdate; } - private Location getRecoveryPosition() throws IOException { - + private Location getRecoveryPosition() throws IOException { + // If we need to recover the transactions.. if (metadata.firstInProgressTransactionLocation != null) { return metadata.firstInProgressTransactionLocation; @@ -613,7 +675,7 @@ public class MessageDatabase extends Ser // Start replay at the record after the last one recorded in the index file. return journal.getNextLocation(metadata.lastUpdate); } - + // This loads the first position. return journal.getNextLocation(null); } @@ -658,7 +720,7 @@ public class MessageDatabase extends Ser // ///////////////////////////////////////////////////////////////// // Methods call by the broker to update and query the store. // ///////////////////////////////////////////////////////////////// - public Location store(JournalCommand data) throws IOException { + public Location store(JournalCommand data) throws IOException { return store(data, false, null,null); } @@ -669,7 +731,7 @@ public class MessageDatabase extends Ser * during a recovery process. * @param done */ - public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { + public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException { if (before != null) { before.run(); } @@ -716,7 +778,7 @@ public class MessageDatabase extends Ser * @return * @throws IOException */ - public JournalCommand load(Location location) throws IOException { + public JournalCommand load(Location location) throws IOException { ByteSequence data = journal.read(location); DataByteArrayInputStream is = new DataByteArrayInputStream(data); byte readByte = is.readByte(); @@ -724,10 +786,30 @@ public class MessageDatabase extends Ser if( type == null ) { throw new IOException("Could not load journal record. Invalid location: "+location); } - JournalCommand message = (JournalCommand)type.createMessage(); + JournalCommand message = (JournalCommand)type.createMessage(); message.mergeFramed(is); return message; } + + /** + * do minimal recovery till we reach the last inDoubtLocation + * @param data + * @param location + * @param inDoubtlocation + * @throws IOException + */ + void process(JournalCommand data, final Location location, final Location inDoubtlocation) throws IOException { + if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { + process(data, location); + } else { + // just recover producer audit + data.visit(new Visitor() { + public void visit(KahaAddMessageCommand command) throws IOException { + metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); + } + }); + } + } // ///////////////////////////////////////////////////////////////// // Journaled record processing methods. Once the record is journaled, @@ -735,7 +817,7 @@ public class MessageDatabase extends Ser // from the recovery method too so they need to be idempotent // ///////////////////////////////////////////////////////////////// - void process(JournalCommand data, final Location location) throws IOException { + void process(JournalCommand data, final Location location) throws IOException { data.visit(new Visitor() { @Override public void visit(KahaAddMessageCommand command) throws IOException { @@ -911,7 +993,7 @@ public class MessageDatabase extends Ser if( previous == null ) { previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); if( previous == null ) { - sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); + sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); } else { // If the message ID as indexed, then the broker asked us to store a DUP // message. Bad BOY! Don't do it, and log a warning. @@ -927,7 +1009,8 @@ public class MessageDatabase extends Ser // TODO: consider just rolling back the tx. sd.locationIndex.put(tx, location, previous); } - + // record this id in any event, initial send or recovery + metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); } void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { @@ -1025,6 +1108,7 @@ public class MessageDatabase extends Ser LOG.debug("Checkpoint started."); metadata.state = OPEN_STATE; + metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); @@ -1111,6 +1195,15 @@ public class MessageDatabase extends Ser LOG.debug("Checkpoint done."); } + private Location checkpointProducerAudit() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oout = new ObjectOutputStream(baos); + oout.writeObject(metadata.producerSequenceIdTracker); + oout.flush(); + oout.close(); + return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray()))); + } + public HashSet getJournalFilesBeingReplicated() { return journalFilesBeingReplicated; } @@ -1580,6 +1673,22 @@ public class MessageDatabase extends Ser return journalMaxFileLength; } + public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { + this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); + } + + public int getMaxFailoverProducersToTrack() { + return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); + } + + public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { + this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); + } + + public int getFailoverProducersAuditDepth() { + return this.metadata.producerSequenceIdTracker.getAuditDepth(); + } + public PageFile getPageFile() { if (pageFile == null) { pageFile = createPageFile(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Thu Jul 8 14:23:21 2010 @@ -33,6 +33,7 @@ import org.apache.activemq.command.Activ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; @@ -569,5 +570,9 @@ public class TempKahaDBStore extends Tem throw new IllegalArgumentException("Not in the valid destination format"); } } + + public long getLastProducerSequenceId(ProducerId id) { + return -1; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java Thu Jul 8 14:23:21 2010 @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; @@ -52,5 +53,8 @@ public class Visitor { public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException { } + + public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException { + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Thu Jul 8 14:23:21 2010 @@ -27,6 +27,7 @@ import org.apache.activemq.broker.Connec import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.ProxyMessageStore; @@ -201,4 +202,9 @@ public class MemoryPersistenceAdapter im createTransactionStore(); } } + + public long getLastProducerSequenceId(ProducerId id) { + // memory map does duplicate suppression + return -1; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jul 8 14:23:21 2010 @@ -727,7 +727,7 @@ public class FailoverTransport implement for (Iterator iter2 = tmpMap.values().iterator(); iter2.hasNext();) { Command command = iter2.next(); if (LOG.isTraceEnabled()) { - LOG.trace("restore, replay: " + command); + LOG.trace("restore requestMap, replay: " + command); } t.oneway(command); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java Thu Jul 8 14:23:21 2010 @@ -19,6 +19,7 @@ package org.apache.activemq.util; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; /** * Simple BitArray to enable setting multiple boolean values efficently Used @@ -27,7 +28,10 @@ import java.io.IOException; * * @version $Revision: 1.1.1.1 $ */ -public class BitArray { +public class BitArray implements Serializable { + + private static final long serialVersionUID = 1L; + static final int LONG_SIZE = 64; static final int INT_SIZE = 32; static final int SHORT_SIZE = 16; @@ -113,6 +117,14 @@ public class BitArray { this.bits = bits; } + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + writeToStream(out); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + readFromStream(in); + } + /** * write the bits to an output stream * Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Thu Jul 8 14:23:21 2010 @@ -16,6 +16,7 @@ */ package org.apache.activemq.util; +import java.io.Serializable; import java.util.LinkedList; /** @@ -23,8 +24,9 @@ import java.util.LinkedList; * * @version $Revision: 1.1.1.1 $ */ -public class BitArrayBin { +public class BitArrayBin implements Serializable { + private static final long serialVersionUID = 1L; private LinkedList list; private int maxNumberOfArrays; private int firstIndex = -1; @@ -162,4 +164,22 @@ public class BitArrayBin { } return answer; } + + public long getLastSetIndex() { + long result = -1; + + if (firstIndex >=0) { + result = firstIndex; + BitArray last = null; + for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) { + last = list.get(lastBitArrayIndex); + if (last != null) { + result += last.length() -1; + result += lastBitArrayIndex * BitArray.LONG_SIZE; + break; + } + } + } + return result; + } } Modified: activemq/trunk/activemq-core/src/main/proto/journal-data.proto URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (original) +++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Thu Jul 8 14:23:21 2010 @@ -29,6 +29,7 @@ enum KahaEntryType { KAHA_ROLLBACK_COMMAND = 5; KAHA_REMOVE_DESTINATION_COMMAND = 6; KAHA_SUBSCRIPTION_COMMAND = 7; + KAHA_PRODUCER_AUDIT_COMMAND = 8; } message KahaTraceCommand { @@ -109,6 +110,18 @@ message KahaSubscriptionCommand { optional bytes subscriptionInfo = 4; } +message KahaProducerAuditCommand { + // We make use of the wonky comment style bellow because the following options + // are not valid for protoc, but they are valid for the ActiveMQ proto compiler. + // In the ActiveMQ proto compiler, comments terminate with the pipe character: | + + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required bytes audit = 1; +} + message KahaDestination { enum DestinationType { QUEUE = 0; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java Thu Jul 8 14:23:21 2010 @@ -152,6 +152,7 @@ public class PerDestinationStoreLimitTes Thread.sleep(1000); // the producer is blocked once the done flag stays true if (done.get()) { + LOG.info("Blocked...."); break; } done.set(true); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java Thu Jul 8 14:23:21 2010 @@ -66,5 +66,6 @@ public class ConnectionInfoTest extends info.setManageable(false); info.setClientMaster(true); info.setFaultTolerant(false); + info.setFailoverReconnect(true); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java Thu Jul 8 14:23:21 2010 @@ -17,6 +17,7 @@ package org.apache.activemq.store; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -120,6 +121,23 @@ public abstract class StoreOrderTest { } @Test + public void testCompositeSendReceiveAfterRestart() throws Exception { + destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest"); + enqueueOneMessage(); + + LOG.info("restart broker"); + stopBroker(); + broker = createRestartedBroker(); + dumpMessages(); + initConnection(); + destination = new ActiveMQQueue("StoreOrderTest"); + assertNotNull("got one message from first dest", receiveOne()); + dumpMessages(); + destination = new ActiveMQQueue("SecondStoreOrderTest"); + assertNotNull("got one message from second dest", receiveOne()); + } + + @Test public void validateUnorderedTxCommit() throws Exception { Executor executor = Executors.newCachedThreadPool(); @@ -247,6 +265,7 @@ public abstract class StoreOrderTest { PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setMemoryLimit(1024*3); defaultEntry.setCursorMemoryHighWaterMark(68); + defaultEntry.setExpireMessagesPeriod(0); map.setDefaultEntry(defaultEntry); brokerService.setDestinationPolicy(map); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=961783&r1=961782&r2=961783&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Thu Jul 8 14:23:21 2010 @@ -28,6 +28,11 @@ public class JDBCPersistenceAdapterTest protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + + // explicitly enable audit as it is now off by default + // due to org.apache.activemq.broker.ProducerBrokerExchange.canDispatch(Message) + jdbc.setEnableAudit(true); + brokerService.setSchedulerSupport(false); brokerService.setPersistenceAdapter(jdbc); jdbc.setBrokerService(brokerService); @@ -56,6 +61,5 @@ public class JDBCPersistenceAdapterTest if (!failed) { fail("Should have failed with audit turned off"); } - } - + } }