Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 1451 invoked from network); 5 Jan 2009 20:49:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Jan 2009 20:49:13 -0000 Received: (qmail 44717 invoked by uid 500); 5 Jan 2009 20:49:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 44663 invoked by uid 500); 5 Jan 2009 20:49:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 44654 invoked by uid 99); 5 Jan 2009 20:49:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jan 2009 12:49:13 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jan 2009 20:49:01 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C64B223888A6; Mon, 5 Jan 2009 12:48:39 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r731704 [1/2] - in /activemq/trunk: activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/main/proto/ activemq-core/src/test/java/org/apache/activemq/store/ activemq-core/src/test/java/org/apache/act... Date: Mon, 05 Jan 2009 20:48:39 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090105204839.C64B223888A6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Jan 5 12:48:38 2009 New Revision: 731704 URL: http://svn.apache.org/viewvc?rev=731704&view=rev Log: Moving stuff around so that activemq-core can depend on kahadb instead of kahadb depending on activemq-core. This step allows kahadb to be used as the default store implementation if desired. Will do that next. Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalCommand.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java activemq/trunk/activemq-core/src/main/proto/ activemq/trunk/activemq-core/src/main/proto/journal-data.proto activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java Removed: activemq/trunk/assembly/src/release/conf/ha-broker.xml activemq/trunk/assembly/src/release/conf/ha.xml activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/store/ activemq/trunk/kahadb/src/main/proto/ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/store/ activemq/trunk/kahadb/src/test/resources/broker1/ activemq/trunk/kahadb/src/test/resources/broker2/ Modified: activemq/trunk/activemq-core/pom.xml activemq/trunk/kahadb/pom.xml Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=731704&r1=731703&r2=731704&view=diff ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Mon Jan 5 12:48:38 2009 @@ -77,7 +77,17 @@ false - + + org.apache.activemq + kahadb + false + + + org.apache.activemq.protobuf + activemq-protobuf + false + + @@ -458,6 +468,18 @@ + org.apache.activemq.protobuf + activemq-protobuf + + + + compile + + + + + + org.apache.maven.plugins maven-clean-plugin Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalCommand.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalCommand.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalCommand.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalCommand.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; + +import org.apache.activemq.store.kahadb.data.KahaEntryType; + +public interface JournalCommand extends org.apache.activemq.protobuf.Message { + + public void visit(Visitor visitor) throws IOException; + + public KahaEntryType type(); + +} Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,581 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionRecoveryListener; +import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaCommitCommand; +import org.apache.activemq.store.kahadb.data.KahaDestination; +import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; +import org.apache.activemq.store.kahadb.data.KahaLocation; +import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; +import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; +import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; +import org.apache.activemq.store.kahadb.data.KahaXATransactionId; +import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.page.Transaction; + +public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { + + private WireFormat wireFormat = new OpenWireFormat(); + + public void setBrokerName(String brokerName) { + } + public void setUsageManager(SystemUsage usageManager) { + } + + public TransactionStore createTransactionStore() throws IOException { + return new TransactionStore(){ + + public void commit(TransactionId txid, boolean wasPrepared) throws IOException { + store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true); + } + public void prepare(TransactionId txid) throws IOException { + store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true); + } + public void rollback(TransactionId txid) throws IOException { + store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false); + } + public void recover(TransactionRecoveryListener listener) throws IOException { + for (Map.Entry> entry : preparedTransactions.entrySet()) { + XATransactionId xid = (XATransactionId)entry.getKey(); + ArrayList messageList = new ArrayList(); + ArrayList ackList = new ArrayList(); + + for (Operation op : entry.getValue()) { + if( op.getClass() == AddOpperation.class ) { + AddOpperation addOp = (AddOpperation)op; + Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); + messageList.add(msg); + } else { + RemoveOpperation rmOp = (RemoveOpperation)op; + MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); + ackList.add(ack); + } + } + + Message[] addedMessages = new Message[messageList.size()]; + MessageAck[] acks = new MessageAck[ackList.size()]; + messageList.toArray(addedMessages); + ackList.toArray(acks); + listener.recover(xid, addedMessages, acks); + } + } + public void start() throws Exception { + } + public void stop() throws Exception { + } + }; + } + + public class KahaDBMessageStore extends AbstractMessageStore { + protected KahaDestination dest; + + public KahaDBMessageStore(ActiveMQDestination destination) { + super(destination); + this.dest = convert( destination ); + } + + public ActiveMQDestination getDestination() { + return destination; + } + + public void addMessage(ConnectionContext context, Message message) throws IOException { + KahaAddMessageCommand command = new KahaAddMessageCommand(); + command.setDestination(dest); + command.setMessageId(message.getMessageId().toString()); + command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) ); + + org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); + command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); + + store(command, isSyncWrites() && message.isResponseRequired()); + + } + + public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { + KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); + command.setDestination(dest); + command.setMessageId(ack.getLastMessageId().toString()); + command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) ); + store(command, isSyncWrites() && ack.isResponseRequired()); + } + + public void removeAllMessages(ConnectionContext context) throws IOException { + KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); + command.setDestination(dest); + store(command, true); + } + + public Message getMessage(MessageId identity) throws IOException { + final String key = identity.toString(); + + // Hopefully one day the page file supports concurrent read operations... but for now we must + // externally synchronize... + Location location; + synchronized(indexMutex) { + location = pageFile.tx().execute(new Transaction.CallableClosure(){ + public Location execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + Long sequence = sd.messageIdIndex.get(tx, key); + if( sequence ==null ) { + return null; + } + return sd.orderIndex.get(tx, sequence).location; + } + }); + } + if( location == null ) { + return null; + } + + return loadMessage(location); + } + + public int getMessageCount() throws IOException { + synchronized(indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure(){ + public Integer execute(Transaction tx) throws IOException { + // Iterate through all index entries to get a count of messages in the destination. + StoredDestination sd = getStoredDestination(dest, tx); + int rc=0; + for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { + iterator.next(); + rc++; + } + return rc; + } + }); + } + } + + public void recover(final MessageRecoveryListener listener) throws Exception { + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + listener.recoverMessage( loadMessage(entry.getValue().location) ); + } + } + }); + } + } + + long cursorPos=0; + + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + Entry entry=null; + int counter = 0; + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + entry = iterator.next(); + listener.recoverMessage( loadMessage(entry.getValue().location ) ); + counter++; + if( counter >= maxReturned ) { + break; + } + } + if( entry!=null ) { + cursorPos = entry.getKey()+1; + } + } + }); + } + } + + public void resetBatching() { + cursorPos=0; + } + + public void setMemoryUsage(MemoryUsage memoeyUSage) { + } + public void start() throws Exception { + } + public void stop() throws Exception { + } + + } + + class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { + public KahaDBTopicMessageStore(ActiveMQTopic destination) { + super(destination); + } + + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { + KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); + command.setDestination(dest); + command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); + command.setMessageId(messageId.toString()); + // We are not passed a transaction info.. so we can't participate in a transaction. + // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack + // to pass back to the XA recover method. + // command.setTransactionInfo(); + store(command, false); + } + + public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { + String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); + KahaSubscriptionCommand command = new KahaSubscriptionCommand(); + command.setDestination(dest); + command.setSubscriptionKey(subscriptionKey); + command.setRetroactive(retroactive); + org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); + command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); + store(command, isSyncWrites() && true); + } + + public void deleteSubscription(String clientId, String subscriptionName) throws IOException { + KahaSubscriptionCommand command = new KahaSubscriptionCommand(); + command.setDestination(dest); + command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); + store(command, isSyncWrites() && true); + } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + + final ArrayList subscriptions = new ArrayList(); + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); + subscriptions.add(info); + + } + } + }); + } + + SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; + subscriptions.toArray(rc); + return rc; + } + + public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { + final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + synchronized(indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure(){ + public SubscriptionInfo execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); + if( command ==null ) { + return null; + } + return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); + } + }); + } + } + + public int getMessageCount(String clientId, String subscriptionName) throws IOException { + final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + synchronized(indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure(){ + public Integer execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + if ( cursorPos==null ) { + // The subscription might not exist. + return 0; + } + cursorPos += 1; + + int counter = 0; + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + iterator.next(); + counter++; + } + return counter; + } + }); + } + } + + public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { + final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + cursorPos += 1; + + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + Entry entry = iterator.next(); + listener.recoverMessage( loadMessage(entry.getValue().location ) ); + } + } + }); + } + } + + public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); + if( cursorPos == null ) { + cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + cursorPos += 1; + } + + Entry entry=null; + int counter = 0; + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + entry = iterator.next(); + listener.recoverMessage( loadMessage(entry.getValue().location ) ); + counter++; + if( counter >= maxReturned ) { + break; + } + } + if( entry!=null ) { + sd.subscriptionCursors.put(subscriptionKey, cursorPos+1); + } + } + }); + } + } + + public void resetBatching(String clientId, String subscriptionName) { + try { + final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(dest, tx); + sd.subscriptionCursors.remove(subscriptionKey); + } + }); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + String subscriptionKey(String clientId, String subscriptionName){ + return clientId+":"+subscriptionName; + } + + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { + return new KahaDBMessageStore(destination); + } + + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { + return new KahaDBTopicMessageStore(destination); + } + + public void deleteAllMessages() throws IOException { + deleteAllMessages=true; + } + + + public Set getDestinations() { + try { + final HashSet rc = new HashSet(); + synchronized(indexMutex) { + pageFile.tx().execute(new Transaction.Closure(){ + public void execute(Transaction tx) throws IOException { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + rc.add(convert(entry.getKey())); + } + } + }); + } + return rc; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public long getLastMessageBrokerSequenceId() throws IOException { + return 0; + } + + public long size() { + if ( !started.get() ) { + return 0; + } + try { + return journal.getDiskSize() + pageFile.getDiskSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void beginTransaction(ConnectionContext context) throws IOException { + throw new IOException("Not yet implemented."); + } + public void commitTransaction(ConnectionContext context) throws IOException { + throw new IOException("Not yet implemented."); + } + public void rollbackTransaction(ConnectionContext context) throws IOException { + throw new IOException("Not yet implemented."); + } + + public void checkpoint(boolean sync) throws IOException { + super.checkpointCleanup(false); + } + + + /////////////////////////////////////////////////////////////////// + // Internal helper methods. + /////////////////////////////////////////////////////////////////// + + /** + * @param location + * @return + * @throws IOException + */ + Message loadMessage(Location location) throws IOException { + KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location); + Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) ); + return msg; + } + + /////////////////////////////////////////////////////////////////// + // Internal conversion methods. + /////////////////////////////////////////////////////////////////// + + KahaTransactionInfo createTransactionInfo(TransactionId txid) { + if( txid ==null ) { + return null; + } + KahaTransactionInfo rc = new KahaTransactionInfo(); + + // Link it up to the previous record that was part of the transaction. + ArrayList tx = inflightTransactions.get(txid); + if( tx!=null ) { + rc.setPreviousEntry(convert(tx.get(tx.size()-1).location)); + } + + if( txid.isLocalTransaction() ) { + LocalTransactionId t = (LocalTransactionId)txid; + KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); + kahaTxId.setConnectionId(t.getConnectionId().getValue()); + kahaTxId.setTransacitonId(t.getValue()); + rc.setLocalTransacitonId(kahaTxId); + } else { + XATransactionId t = (XATransactionId)txid; + KahaXATransactionId kahaTxId = new KahaXATransactionId(); + kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); + kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); + kahaTxId.setFormatId(t.getFormatId()); + rc.setXaTransacitonId(kahaTxId); + } + return rc; + } + + KahaLocation convert(Location location) { + KahaLocation rc = new KahaLocation(); + rc.setLogId(location.getDataFileId()); + rc.setOffset(location.getOffset()); + return rc; + } + + KahaDestination convert(ActiveMQDestination dest) { + KahaDestination rc = new KahaDestination(); + rc.setName(dest.getPhysicalName()); + switch( dest.getDestinationType() ) { + case ActiveMQDestination.QUEUE_TYPE: + rc.setType(DestinationType.QUEUE); + return rc; + case ActiveMQDestination.TOPIC_TYPE: + rc.setType(DestinationType.TOPIC); + return rc; + case ActiveMQDestination.TEMP_QUEUE_TYPE: + rc.setType(DestinationType.TEMP_QUEUE); + return rc; + case ActiveMQDestination.TEMP_TOPIC_TYPE: + rc.setType(DestinationType.TEMP_TOPIC); + return rc; + default: + return null; + } + } + + ActiveMQDestination convert(String dest) { + int p = dest.indexOf(":"); + if( p<0 ) { + throw new IllegalArgumentException("Not in the valid destination format"); + } + int type = Integer.parseInt(dest.substring(0, p)); + String name = dest.substring(p+1); + + switch( KahaDestination.DestinationType.valueOf(type) ) { + case QUEUE: + return new ActiveMQQueue(name); + case TOPIC: + return new ActiveMQTopic(name); + case TEMP_QUEUE: + return new ActiveMQTempQueue(name); + case TEMP_TOPIC: + return new ActiveMQTempTopic(name); + default: + throw new IllegalArgumentException("Not in the valid destination format"); + } + } + +} Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,1243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaCommitCommand; +import org.apache.activemq.store.kahadb.data.KahaDestination; +import org.apache.activemq.store.kahadb.data.KahaEntryType; +import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; +import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; +import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; +import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; +import org.apache.activemq.store.kahadb.data.KahaXATransactionId; +import org.apache.activemq.util.Callback; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.index.BTreeIndex; +import org.apache.kahadb.index.BTreeVisitor; +import org.apache.kahadb.journal.Journal; +import org.apache.kahadb.journal.Location; +import org.apache.kahadb.page.Page; +import org.apache.kahadb.page.PageFile; +import org.apache.kahadb.page.Transaction; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.DataByteArrayInputStream; +import org.apache.kahadb.util.DataByteArrayOutputStream; +import org.apache.kahadb.util.LockFile; +import org.apache.kahadb.util.LongMarshaller; +import org.apache.kahadb.util.Marshaller; +import org.apache.kahadb.util.StringMarshaller; + +public class MessageDatabase { + + private static final Log LOG = LogFactory.getLog(MessageDatabase.class); + private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; + + public static final int CLOSED_STATE = 1; + public static final int OPEN_STATE = 2; + + protected class Metadata { + protected Page page; + protected int state; + protected BTreeIndex destinations; + protected Location lastUpdate; + protected Location firstInProgressTransactionLocation; + + public void read(DataInput is) throws IOException { + state = is.readInt(); + destinations = new BTreeIndex(pageFile, is.readLong()); + if (is.readBoolean()) { + lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); + } else { + lastUpdate = null; + } + if (is.readBoolean()) { + firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); + } else { + firstInProgressTransactionLocation = null; + } + } + + public void write(DataOutput os) throws IOException { + os.writeInt(state); + os.writeLong(destinations.getPageId()); + + if (lastUpdate != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); + } else { + os.writeBoolean(false); + } + + if (firstInProgressTransactionLocation != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); + } else { + os.writeBoolean(false); + } + } + } + + class MetadataMarshaller implements Marshaller { + public Class getType() { + return Metadata.class; + } + + public Metadata readPayload(DataInput dataIn) throws IOException { + Metadata rc = new Metadata(); + rc.read(dataIn); + return rc; + } + + public void writePayload(Metadata object, DataOutput dataOut) throws IOException { + object.write(dataOut); + } + } + + protected PageFile pageFile; + protected Journal journal; + protected Metadata metadata = new Metadata(); + + protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); + + protected boolean failIfDatabaseIsLocked; + + protected boolean deleteAllMessages; + protected File directory; + protected Thread checkpointThread; + protected boolean syncWrites=true; + int checkpointInterval = 5*1000; + int cleanupInterval = 30*1000; + + protected AtomicBoolean started = new AtomicBoolean(); + protected AtomicBoolean opened = new AtomicBoolean(); + private LockFile lockFile; + + public MessageDatabase() { + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + load(); + } + } + + public void stop() throws Exception { + if (started.compareAndSet(true, false)) { + unload(); + } + } + + private void loadPageFile() throws IOException { + synchronized (indexMutex) { + final PageFile pageFile = getPageFile(); + pageFile.load(); + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + // First time this is created.. Initialize the metadata + Page page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metadata); + metadata.page = page; + metadata.state = CLOSED_STATE; + metadata.destinations = new BTreeIndex(pageFile, tx.allocate().getPageId()); + + tx.store(metadata.page, metadataMarshaller, true); + } else { + Page page = tx.load(0, metadataMarshaller); + metadata = page.get(); + metadata.page = page; + } + metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); + metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); + metadata.destinations.load(tx); + } + }); + pageFile.flush(); + + // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. + // Perhaps we should just keep an index of file + storedDestinations.clear(); + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); + storedDestinations.put(entry.getKey(), sd); + } + } + }); + } + } + + /** + * @throws IOException + */ + public void open() throws IOException { + if( opened.compareAndSet(false, true) ) { + File lockFileName = new File(directory, "lock"); + lockFile = new LockFile(lockFileName, true); + if (failIfDatabaseIsLocked) { + lockFile.lock(); + } else { + while (true) { + try { + lockFile.lock(); + break; + } catch (IOException e) { + LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked."); + try { + Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); + } catch (InterruptedException e1) { + } + } + } + } + + getJournal().start(); + + loadPageFile(); + + checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { + public void run() { + try { + long lastCleanup = System.currentTimeMillis(); + long lastCheckpoint = System.currentTimeMillis(); + + // Sleep for a short time so we can periodically check + // to see if we need to exit this thread. + long sleepTime = Math.min(checkpointInterval, 500); + while (opened.get()) { + Thread.sleep(sleepTime); + long now = System.currentTimeMillis(); + if( now - lastCleanup >= cleanupInterval ) { + checkpointCleanup(true); + lastCleanup = now; + lastCheckpoint = now; + } else if( now - lastCheckpoint >= checkpointInterval ) { + checkpointCleanup(false); + lastCheckpoint = now; + } + } + } catch (InterruptedException e) { + // Looks like someone really wants us to exit this thread... + } + } + }; + checkpointThread.start(); + recover(); + } + } + + public void load() throws IOException { + + synchronized (indexMutex) { + open(); + + if (deleteAllMessages) { + journal.delete(); + + pageFile.unload(); + pageFile.delete(); + metadata = new Metadata(); + + LOG.info("Persistence store purged."); + deleteAllMessages = false; + + loadPageFile(); + } + store(new KahaTraceCommand().setMessage("LOADED " + new Date())); + + } + + } + + + public void close() throws IOException, InterruptedException { + if( opened.compareAndSet(true, false)) { + synchronized (indexMutex) { + pageFile.unload(); + metadata = new Metadata(); + } + journal.close(); + checkpointThread.join(); + lockFile.unlock(); + lockFile=null; + } + } + + public void unload() throws IOException, InterruptedException { + synchronized (indexMutex) { + if( pageFile.isLoaded() ) { + metadata.state = CLOSED_STATE; + metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + tx.store(metadata.page, metadataMarshaller, true); + } + }); + close(); + } + } + } + + /** + * @return + */ + private Location getFirstInProgressTxLocation() { + Location l = null; + if (!inflightTransactions.isEmpty()) { + l = inflightTransactions.values().iterator().next().get(0).getLocation(); + } + if (!preparedTransactions.isEmpty()) { + Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); + if (l==null || t.compareTo(l) <= 0) { + l = t; + } + } + return l; + } + + /** + * Move all the messages that were in the journal into long term storage. We + * just replay and do a checkpoint. + * + * @throws IOException + * @throws IOException + * @throws InvalidLocationException + * @throws IllegalStateException + */ + private void recover() throws IllegalStateException, IOException { + long start = System.currentTimeMillis(); + + Location recoveryPosition = getRecoveryPosition(); + if( recoveryPosition ==null ) { + return; + } + + int redoCounter = 0; + LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset()); + + while (recoveryPosition != null) { + JournalCommand message = load(recoveryPosition); + metadata.lastUpdate = recoveryPosition; + process(message, recoveryPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + } + long end = System.currentTimeMillis(); + LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); + } + + private Location nextRecoveryPosition; + private Location lastRecoveryPosition; + + public void incrementalRecover() throws IOException { + if( nextRecoveryPosition == null ) { + if( lastRecoveryPosition==null ) { + nextRecoveryPosition = getRecoveryPosition(); + } else { + nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); + } + } + while (nextRecoveryPosition != null) { + lastRecoveryPosition = nextRecoveryPosition; + metadata.lastUpdate = lastRecoveryPosition; + JournalCommand message = load(lastRecoveryPosition); + process(message, lastRecoveryPosition); + nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); + } + } + + public Location getLastUpdatePosition() throws IOException { + return metadata.lastUpdate; + } + + private Location getRecoveryPosition() throws IOException { + + // If we need to recover the transactions.. + if (metadata.firstInProgressTransactionLocation != null) { + return metadata.firstInProgressTransactionLocation; + } + + // Perhaps there were no transactions... + if( metadata.lastUpdate!=null) { + // Start replay at the record after the last one recorded in the index file. + return journal.getNextLocation(metadata.lastUpdate); + } + + // This loads the first position. + return journal.getNextLocation(null); + } + + protected void checkpointCleanup(final boolean cleanup) { + try { + synchronized (indexMutex) { + if( !opened.get() ) { + return; + } + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, cleanup); + } + }); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + + public void checkpoint(Callback closure) throws Exception { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, false); + } + }); + pageFile.flush(); + closure.execute(); + } + } + + // ///////////////////////////////////////////////////////////////// + // Methods call by the broker to update and query the store. + // ///////////////////////////////////////////////////////////////// + public Location store(JournalCommand data) throws IOException { + return store(data, false); + } + + /** + * All updated are are funneled through this method. The updates a converted + * to a JournalMessage which is logged to the journal and then the data from + * the JournalMessage is used to update the index just like it would be done + * durring a recovery process. + */ + public Location store(JournalCommand data, boolean sync) throws IOException { + int size = data.serializedSizeFramed(); + DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); + os.writeByte(data.type().getNumber()); + data.writeFramed(os); + Location location = journal.write(os.toByteSequence(), sync); + process(data, location); + metadata.lastUpdate = location; + return location; + } + + /** + * Loads a previously stored JournalMessage + * + * @param location + * @return + * @throws IOException + */ + public JournalCommand load(Location location) throws IOException { + ByteSequence data = journal.read(location); + DataByteArrayInputStream is = new DataByteArrayInputStream(data); + byte readByte = is.readByte(); + KahaEntryType type = KahaEntryType.valueOf(readByte); + if( type == null ) { + throw new IOException("Could not load journal record. Invalid location: "+location); + } + JournalCommand message = (JournalCommand)type.createMessage(); + message.mergeFramed(is); + return message; + } + + // ///////////////////////////////////////////////////////////////// + // Journaled record processing methods. Once the record is journaled, + // these methods handle applying the index updates. These may be called + // from the recovery method too so they need to be idempotent + // ///////////////////////////////////////////////////////////////// + + private void process(JournalCommand data, final Location location) throws IOException { + data.visit(new Visitor() { + @Override + public void visit(KahaAddMessageCommand command) throws IOException { + process(command, location); + } + + @Override + public void visit(KahaRemoveMessageCommand command) throws IOException { + process(command, location); + } + + @Override + public void visit(KahaPrepareCommand command) throws IOException { + process(command, location); + } + + @Override + public void visit(KahaCommitCommand command) throws IOException { + process(command, location); + } + + @Override + public void visit(KahaRollbackCommand command) throws IOException { + process(command, location); + } + + @Override + public void visit(KahaRemoveDestinationCommand command) throws IOException { + process(command, location); + } + + @Override + public void visit(KahaSubscriptionCommand command) throws IOException { + process(command, location); + } + }); + } + + private void process(final KahaAddMessageCommand command, final Location location) throws IOException { + if (command.hasTransactionInfo()) { + synchronized (indexMutex) { + ArrayList inflightTx = getInflightTx(command.getTransactionInfo(), location); + inflightTx.add(new AddOpperation(command, location)); + } + } else { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + upadateIndex(tx, command, location); + } + }); + } + } + } + + protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { + if (command.hasTransactionInfo()) { + synchronized (indexMutex) { + ArrayList inflightTx = getInflightTx(command.getTransactionInfo(), location); + inflightTx.add(new RemoveOpperation(command, location)); + } + } else { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + updateIndex(tx, command, location); + } + }); + } + } + + } + + protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + updateIndex(tx, command, location); + } + }); + } + } + + protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + updateIndex(tx, command, location); + } + }); + } + } + + protected void process(KahaCommitCommand command, Location location) throws IOException { + TransactionId key = key(command.getTransactionInfo()); + synchronized (indexMutex) { + ArrayList inflightTx = inflightTransactions.remove(key); + if (inflightTx == null) { + inflightTx = preparedTransactions.remove(key); + } + if (inflightTx == null) { + return; + } + + final ArrayList messagingTx = inflightTx; + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + for (Operation op : messagingTx) { + op.execute(tx); + } + } + }); + } + } + + protected void process(KahaPrepareCommand command, Location location) { + synchronized (indexMutex) { + TransactionId key = key(command.getTransactionInfo()); + ArrayList tx = inflightTransactions.remove(key); + if (tx != null) { + preparedTransactions.put(key, tx); + } + } + } + + protected void process(KahaRollbackCommand command, Location location) { + synchronized (indexMutex) { + TransactionId key = key(command.getTransactionInfo()); + ArrayList tx = inflightTransactions.remove(key); + if (tx == null) { + preparedTransactions.remove(key); + } + } + } + + // ///////////////////////////////////////////////////////////////// + // These methods do the actual index updates. + // ///////////////////////////////////////////////////////////////// + + protected final Object indexMutex = new Object(); + private final HashSet journalFilesBeingReplicated = new HashSet(); + + private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { + StoredDestination sd = getStoredDestination(command.getDestination(), tx); + + // Skip adding the message to the index if this is a topic and there are + // no subscriptions. + if (sd.subscriptions != null && sd.ackPositions.isEmpty()) { + return; + } + + // Add the message. + long id = sd.nextMessageId++; + Long previous = sd.locationIndex.put(tx, location, id); + if( previous == null ) { + sd.messageIdIndex.put(tx, command.getMessageId(), id); + sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); + } else { + // restore the previous value.. Looks like this was a redo of a previously + // added message. We don't want to assing it a new id as the other indexes would + // be wrong.. + sd.locationIndex.put(tx, location, previous); + } + + } + + private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { + StoredDestination sd = getStoredDestination(command.getDestination(), tx); + if (!command.hasSubscriptionKey()) { + + // In the queue case we just remove the message from the index.. + Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); + if (sequenceId != null) { + MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); + sd.locationIndex.remove(tx, keys.location); + } + } else { + // In the topic case we need remove the message once it's been acked + // by all the subs + Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); + + // Make sure it's a valid message id... + if (sequence != null) { + String subscriptionKey = command.getSubscriptionKey(); + Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); + + // The following method handles deleting un-referenced messages. + removeAckLocation(tx, sd, subscriptionKey, prev); + + // Add it to the new location set. + addAckLocation(sd, sequence, subscriptionKey); + } + + } + } + + private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { + StoredDestination sd = getStoredDestination(command.getDestination(), tx); + sd.orderIndex.clear(tx); + sd.orderIndex.unload(tx); + tx.free(sd.orderIndex.getPageId()); + + sd.locationIndex.clear(tx); + sd.locationIndex.unload(tx); + tx.free(sd.locationIndex.getPageId()); + + sd.messageIdIndex.clear(tx); + sd.messageIdIndex.unload(tx); + tx.free(sd.messageIdIndex.getPageId()); + + if (sd.subscriptions != null) { + sd.subscriptions.clear(tx); + sd.subscriptions.unload(tx); + tx.free(sd.subscriptions.getPageId()); + + sd.subscriptionAcks.clear(tx); + sd.subscriptionAcks.unload(tx); + tx.free(sd.subscriptionAcks.getPageId()); + } + + String key = key(command.getDestination()); + storedDestinations.remove(key); + metadata.destinations.remove(tx, key); + } + + private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { + StoredDestination sd = getStoredDestination(command.getDestination(), tx); + + // If set then we are creating it.. otherwise we are destroying the sub + if (command.hasSubscriptionInfo()) { + String subscriptionKey = command.getSubscriptionKey(); + sd.subscriptions.put(tx, subscriptionKey, command); + long ackLocation=-1; + if (!command.getRetroactive()) { + ackLocation = sd.nextMessageId-1; + } + + sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation); + addAckLocation(sd, ackLocation, subscriptionKey); + } else { + // delete the sub... + String subscriptionKey = command.getSubscriptionKey(); + sd.subscriptions.remove(tx, subscriptionKey); + Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey); + if( prev!=null ) { + removeAckLocation(tx, sd, subscriptionKey, prev); + } + } + + } + + /** + * @param tx + * @throws IOException + */ + private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { + + LOG.debug("Checkpoint started."); + + metadata.state = OPEN_STATE; + metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + tx.store(metadata.page, metadataMarshaller, true); + pageFile.flush(); + + if( cleanup ) { + // Find empty journal files to remove. + final HashSet inUseFiles = new HashSet(); + for (StoredDestination sd : storedDestinations.values()) { + + // Use a visitor to cut down the number of pages that we load + sd.locationIndex.visit(tx, new BTreeVisitor() { + int last=-1; + public boolean isInterestedInKeysBetween(Location first, Location second) { + if( second!=null ) { + if( last+1 == second.getDataFileId() ) { + last++; + inUseFiles.add(last); + } + if( last == second.getDataFileId() ) { + return false; + } + } + return true; + } + + public void visit(List keys, List values) { + for (int i = 0; i < keys.size(); i++) { + if( last != keys.get(i).getDataFileId() ) { + inUseFiles.add(keys.get(i).getDataFileId()); + last = keys.get(i).getDataFileId(); + } + } + + } + + }); + } + inUseFiles.addAll(journalFilesBeingReplicated); + Location l = metadata.lastUpdate; + if( metadata.firstInProgressTransactionLocation!=null ) { + l = metadata.firstInProgressTransactionLocation; + } + + LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l); + journal.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId()); + } + + LOG.debug("Checkpoint done."); + } + + public HashSet getJournalFilesBeingReplicated() { + return journalFilesBeingReplicated; + } + + // ///////////////////////////////////////////////////////////////// + // StoredDestination related implementation methods. + // ///////////////////////////////////////////////////////////////// + + + private final HashMap storedDestinations = new HashMap(); + + class StoredSubscription { + SubscriptionInfo subscriptionInfo; + String lastAckId; + Location lastAckLocation; + Location cursor; + } + + static class MessageKeys { + final String messageId; + final Location location; + + public MessageKeys(String messageId, Location location) { + this.messageId=messageId; + this.location=location; + } + + @Override + public String toString() { + return "["+messageId+","+location+"]"; + } + } + + static protected class MessageKeysMarshaller implements Marshaller { + static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); + + public Class getType() { + return MessageKeys.class; + } + + public MessageKeys readPayload(DataInput dataIn) throws IOException { + return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); + } + + public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { + dataOut.writeUTF(object.messageId); + LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); + } + } + + static class StoredDestination { + long nextMessageId; + BTreeIndex orderIndex; + BTreeIndex locationIndex; + BTreeIndex messageIdIndex; + + // These bits are only set for Topics + BTreeIndex subscriptions; + BTreeIndex subscriptionAcks; + HashMap subscriptionCursors; + TreeMap> ackPositions; + } + + protected class StoredDestinationMarshaller implements Marshaller { + public Class getType() { + return StoredDestination.class; + } + + public StoredDestination readPayload(DataInput dataIn) throws IOException { + StoredDestination value = new StoredDestination(); + value.orderIndex = new BTreeIndex(pageFile, dataIn.readLong()); + value.locationIndex = new BTreeIndex(pageFile, dataIn.readLong()); + value.messageIdIndex = new BTreeIndex(pageFile, dataIn.readLong()); + + if (dataIn.readBoolean()) { + value.subscriptions = new BTreeIndex(pageFile, dataIn.readLong()); + value.subscriptionAcks = new BTreeIndex(pageFile, dataIn.readLong()); + } + return value; + } + + public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { + dataOut.writeLong(value.orderIndex.getPageId()); + dataOut.writeLong(value.locationIndex.getPageId()); + dataOut.writeLong(value.messageIdIndex.getPageId()); + if (value.subscriptions != null) { + dataOut.writeBoolean(true); + dataOut.writeLong(value.subscriptions.getPageId()); + dataOut.writeLong(value.subscriptionAcks.getPageId()); + } else { + dataOut.writeBoolean(false); + } + } + } + + static class LocationMarshaller implements Marshaller { + final static LocationMarshaller INSTANCE = new LocationMarshaller(); + + public Class getType() { + return Location.class; + } + + public Location readPayload(DataInput dataIn) throws IOException { + Location rc = new Location(); + rc.setDataFileId(dataIn.readInt()); + rc.setOffset(dataIn.readInt()); + return rc; + } + + public void writePayload(Location object, DataOutput dataOut) throws IOException { + dataOut.writeInt(object.getDataFileId()); + dataOut.writeInt(object.getOffset()); + } + } + + static class KahaSubscriptionCommandMarshaller implements Marshaller { + final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); + + public Class getType() { + return KahaSubscriptionCommand.class; + } + + public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { + KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); + rc.mergeFramed((InputStream)dataIn); + return rc; + } + + public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { + object.writeFramed((OutputStream)dataOut); + } + } + + protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { + String key = key(destination); + StoredDestination rc = storedDestinations.get(key); + if (rc == null) { + boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; + rc = loadStoredDestination(tx, key, topic); + // Cache it. We may want to remove/unload destinations from the + // cache that are not used for a while + // to reduce memory usage. + storedDestinations.put(key, rc); + } + return rc; + } + + /** + * @param tx + * @param key + * @param topic + * @return + * @throws IOException + */ + private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { + // Try to load the existing indexes.. + StoredDestination rc = metadata.destinations.get(tx, key); + if (rc == null) { + // Brand new destination.. allocate indexes for it. + rc = new StoredDestination(); + rc.orderIndex = new BTreeIndex(pageFile, tx.allocate()); + rc.locationIndex = new BTreeIndex(pageFile, tx.allocate()); + rc.messageIdIndex = new BTreeIndex(pageFile, tx.allocate()); + + if (topic) { + rc.subscriptions = new BTreeIndex(pageFile, tx.allocate()); + rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); + } + metadata.destinations.put(tx, key, rc); + } + + // Configure the marshalers and load. + rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + rc.orderIndex.load(tx); + + // Figure out the next key using the last entry in the destination. + Entry lastEntry = rc.orderIndex.getLast(tx); + if( lastEntry!=null ) { + rc.nextMessageId = lastEntry.getKey()+1; + } + + rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE); + rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); + rc.locationIndex.load(tx); + + rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); + rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); + rc.messageIdIndex.load(tx); + + // If it was a topic... + if (topic) { + + rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); + rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); + rc.subscriptions.load(tx); + + rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); + rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE); + rc.subscriptionAcks.load(tx); + + rc.ackPositions = new TreeMap>(); + rc.subscriptionCursors = new HashMap(); + + for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + addAckLocation(rc, entry.getValue(), entry.getKey()); + } + + } + return rc; + } + + /** + * @param sd + * @param messageSequence + * @param subscriptionKey + */ + private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) { + HashSet hs = sd.ackPositions.get(messageSequence); + if (hs == null) { + hs = new HashSet(); + sd.ackPositions.put(messageSequence, hs); + } + hs.add(subscriptionKey); + } + + /** + * @param tx + * @param sd + * @param subscriptionKey + * @param sequenceId + * @throws IOException + */ + private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { + // Remove the sub from the previous location set.. + if (sequenceId != null) { + HashSet hs = sd.ackPositions.get(sequenceId); + if (hs != null) { + hs.remove(subscriptionKey); + if (hs.isEmpty()) { + HashSet firstSet = sd.ackPositions.values().iterator().next(); + sd.ackPositions.remove(sequenceId); + + // Did we just empty out the first set in the + // ordered list of ack locations? Then it's time to + // delete some messages. + if (hs == firstSet) { + + // Find all the entries that need to get deleted. + ArrayList> deletes = new ArrayList>(); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + if (entry.getKey().compareTo(sequenceId) <= 0) { + // We don't do the actually delete while we are + // iterating the BTree since + // iterating would fail. + deletes.add(entry); + } + } + + // Do the actual deletes. + for (Entry entry : deletes) { + sd.locationIndex.remove(tx, entry.getValue().location); + sd.messageIdIndex.remove(tx,entry.getValue().messageId); + sd.orderIndex.remove(tx,entry.getKey()); + } + } + } + } + } + } + + private String key(KahaDestination destination) { + return destination.getType().getNumber() + ":" + destination.getName(); + } + + // ///////////////////////////////////////////////////////////////// + // Transaction related implementation methods. + // ///////////////////////////////////////////////////////////////// + protected final LinkedHashMap> inflightTransactions = new LinkedHashMap>(); + protected final LinkedHashMap> preparedTransactions = new LinkedHashMap>(); + + private ArrayList getInflightTx(KahaTransactionInfo info, Location location) { + TransactionId key = key(info); + ArrayList tx = inflightTransactions.get(key); + if (tx == null) { + tx = new ArrayList(); + inflightTransactions.put(key, tx); + } + return tx; + } + + private TransactionId key(KahaTransactionInfo transactionInfo) { + if (transactionInfo.hasLocalTransacitonId()) { + KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId(); + LocalTransactionId rc = new LocalTransactionId(); + rc.setConnectionId(new ConnectionId(tx.getConnectionId())); + rc.setValue(tx.getTransacitonId()); + return rc; + } else { + KahaXATransactionId tx = transactionInfo.getXaTransacitonId(); + XATransactionId rc = new XATransactionId(); + rc.setBranchQualifier(tx.getBranchQualifier().toByteArray()); + rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray()); + rc.setFormatId(tx.getFormatId()); + return rc; + } + } + + abstract class Operation { + final Location location; + + public Operation(Location location) { + this.location = location; + } + + public Location getLocation() { + return location; + } + + abstract public void execute(Transaction tx) throws IOException; + } + + class AddOpperation extends Operation { + final KahaAddMessageCommand command; + + public AddOpperation(KahaAddMessageCommand command, Location location) { + super(location); + this.command = command; + } + + public void execute(Transaction tx) throws IOException { + upadateIndex(tx, command, location); + } + + public KahaAddMessageCommand getCommand() { + return command; + } + } + + class RemoveOpperation extends Operation { + final KahaRemoveMessageCommand command; + + public RemoveOpperation(KahaRemoveMessageCommand command, Location location) { + super(location); + this.command = command; + } + + public void execute(Transaction tx) throws IOException { + updateIndex(tx, command, location); + } + + public KahaRemoveMessageCommand getCommand() { + return command; + } + } + + // ///////////////////////////////////////////////////////////////// + // Initialization related implementation methods. + // ///////////////////////////////////////////////////////////////// + + private PageFile createPageFile() { + PageFile pf = new PageFile(directory, "db"); + return pf; + } + + private Journal createJournal() { + Journal manager = new Journal(); + manager.setDirectory(directory); + manager.setMaxFileLength(1024 * 1024 * 20); + manager.setUseNio(false); + return manager; + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public boolean isDeleteAllMessages() { + return deleteAllMessages; + } + + public void setDeleteAllMessages(boolean deleteAllMessages) { + this.deleteAllMessages = deleteAllMessages; + } + + public boolean isSyncWrites() { + return syncWrites; + } + + public void setSyncWrites(boolean syncWrites) { + this.syncWrites = syncWrites; + } + + public int getCheckpointInterval() { + return checkpointInterval; + } + + public void setCheckpointInterval(int checkpointInterval) { + this.checkpointInterval = checkpointInterval; + } + + public int getCleanupInterval() { + return cleanupInterval; + } + + public void setCleanupInterval(int cleanupInterval) { + this.cleanupInterval = cleanupInterval; + } + + public PageFile getPageFile() { + if (pageFile == null) { + pageFile = createPageFile(); + } + return pageFile; + } + + public Journal getJournal() { + if (journal == null) { + journal = createJournal(); + } + return journal; + } + + public boolean isFailIfDatabaseIsLocked() { + return failIfDatabaseIsLocked; + } + + public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { + this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + } +} Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; + +import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaCommitCommand; +import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; +import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; +import org.apache.activemq.store.kahadb.data.KahaTraceCommand; + +public class Visitor { + + public void visit(KahaTraceCommand command) { + } + + public void visit(KahaRollbackCommand command) throws IOException { + } + + public void visit(KahaRemoveMessageCommand command) throws IOException { + } + + public void visit(KahaPrepareCommand command) throws IOException { + } + + public void visit(KahaCommitCommand command) throws IOException { + } + + public void visit(KahaAddMessageCommand command) throws IOException { + } + + public void visit(KahaRemoveDestinationCommand command) throws IOException { + } + + public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException { + } + +}