Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 1943 invoked from network); 8 Mar 2007 14:21:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Mar 2007 14:21:47 -0000 Received: (qmail 17048 invoked by uid 500); 8 Mar 2007 14:21:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 17014 invoked by uid 500); 8 Mar 2007 14:21:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 16934 invoked by uid 99); 8 Mar 2007 14:21:54 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Mar 2007 06:21:54 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Mar 2007 06:21:35 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 5125C1A985D; Thu, 8 Mar 2007 06:20:48 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r516048 [9/14] - in /activemq/trunk: activemq-book/src/docbkx/ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/blob/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ act... Date: Thu, 08 Mar 2007 14:20:38 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070308142048.5125C1A985D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java Thu Mar 8 06:20:29 2007 @@ -1 +1,138 @@ -/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.openwire.v3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.activemq.openwire.*; import org.apache.activ emq.command.*; /** * Marshalling code for Open Wire Format for XATransactionIdMarshaller * * * NOTE!: This file is auto generated - do not modify! * if you need to make a change, please see the modify the groovy scripts in the * under src/gram/script and then use maven openwire:generate to regenerate * this file. * * @version $Revision$ */ public class XATransactionIdMarshaller extends TransactionIdMarshaller { /** * Return the type of Data Structure we marshal * @return short representation of the type data structure */ public byte getDataStructureType() { return XATransactionId.DATA_STRUCTURE_TYPE; } /** * @return a new object instance */ public DataStructure createObject() { return new XATransactionId(); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException { super.tightUnmarshal(wireFormat, o, dataIn, bs); XATransactionId info = (XATransactionId)o; info.setFormatId(dataIn.readInt()); info.setGlobalTransactionId(tightUnmarshalByteArray(dataIn, bs)); info.setBranchQualifier(tightUnmarshalByteArray(dataIn, bs)); } /** * Write the booleans that this object uses to a BooleanStream */ public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { XATransactionId info = (XATransactionId)o; int rc = super.tightMarshal1(wireFormat, o, bs); rc += tightMarshalByteArray1(info.getGlobalTransactionId(), bs); rc += tightMarshalByteArray1(info.getBranchQualifier(), bs); return rc + 4; } /** * Write a obje ct instance to data output stream * * @param o the instance to be marshaled * @param dataOut the output stream * @throws IOException thrown if an error occurs */ public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException { super.tightMarshal2(wireFormat, o, dataOut, bs); XATransactionId info = (XATransactionId)o; dataOut.writeInt(info.getFormatId()); tightMarshalByteArray2(info.getGlobalTransactionId(), dataOut, bs); tightMarshalByteArray2(info.getBranchQualifier(), dataOut, bs); } /** * Un-marshal an object instance from the data input stream * * @param o the object to un-marshal * @param dataIn the data input stream to build the object from * @throws IOException */ public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException { super.looseUnmarshal(wireFo rmat, o, dataIn); XATransactionId info = (XATransactionId)o; info.setFormatId(dataIn.readInt()); info.setGlobalTransactionId(looseUnmarshalByteArray(dataIn)); info.setBranchQualifier(looseUnmarshalByteArray(dataIn)); } /** * Write the booleans that this object uses to a BooleanStream */ public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException { XATransactionId info = (XATransactionId)o; super.looseMarshal(wireFormat, o, dataOut); dataOut.writeInt(info.getFormatId()); looseMarshalByteArray(wireFormat, info.getGlobalTransactionId(), dataOut); looseMarshalByteArray(wireFormat, info.getBranchQualifier(), dataOut); } } \ No newline at end of file +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.openwire.v3; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.activemq.openwire.*; +import org.apache.activemq.command.*; + + + +/** + * Marshalling code for Open Wire Format for XATransactionIdMarshaller + * + * + * NOTE!: This file is auto generated - do not modify! + * if you need to make a change, please see the modify the groovy scripts in the + * under src/gram/script and then use maven openwire:generate to regenerate + * this file. + * + * @version $Revision$ + */ +public class XATransactionIdMarshaller extends TransactionIdMarshaller { + + /** + * Return the type of Data Structure we marshal + * @return short representation of the type data structure + */ + public byte getDataStructureType() { + return XATransactionId.DATA_STRUCTURE_TYPE; + } + + /** + * @return a new object instance + */ + public DataStructure createObject() { + return new XATransactionId(); + } + + /** + * Un-marshal an object instance from the data input stream + * + * @param o the object to un-marshal + * @param dataIn the data input stream to build the object from + * @throws IOException + */ + public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException { + super.tightUnmarshal(wireFormat, o, dataIn, bs); + + XATransactionId info = (XATransactionId)o; + info.setFormatId(dataIn.readInt()); + info.setGlobalTransactionId(tightUnmarshalByteArray(dataIn, bs)); + info.setBranchQualifier(tightUnmarshalByteArray(dataIn, bs)); + + } + + + /** + * Write the booleans that this object uses to a BooleanStream + */ + public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException { + + XATransactionId info = (XATransactionId)o; + + int rc = super.tightMarshal1(wireFormat, o, bs); + rc += tightMarshalByteArray1(info.getGlobalTransactionId(), bs); + rc += tightMarshalByteArray1(info.getBranchQualifier(), bs); + + return rc + 4; + } + + /** + * Write a object instance to data output stream + * + * @param o the instance to be marshaled + * @param dataOut the output stream + * @throws IOException thrown if an error occurs + */ + public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException { + super.tightMarshal2(wireFormat, o, dataOut, bs); + + XATransactionId info = (XATransactionId)o; + dataOut.writeInt(info.getFormatId()); + tightMarshalByteArray2(info.getGlobalTransactionId(), dataOut, bs); + tightMarshalByteArray2(info.getBranchQualifier(), dataOut, bs); + + } + + /** + * Un-marshal an object instance from the data input stream + * + * @param o the object to un-marshal + * @param dataIn the data input stream to build the object from + * @throws IOException + */ + public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException { + super.looseUnmarshal(wireFormat, o, dataIn); + + XATransactionId info = (XATransactionId)o; + info.setFormatId(dataIn.readInt()); + info.setGlobalTransactionId(looseUnmarshalByteArray(dataIn)); + info.setBranchQualifier(looseUnmarshalByteArray(dataIn)); + + } + + + /** + * Write the booleans that this object uses to a BooleanStream + */ + public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException { + + XATransactionId info = (XATransactionId)o; + + super.looseMarshal(wireFormat, o, dataOut); + dataOut.writeInt(info.getFormatId()); + looseMarshalByteArray(wireFormat, info.getGlobalTransactionId(), dataOut); + looseMarshalByteArray(wireFormat, info.getBranchQualifier(), dataOut); + + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java Thu Mar 8 06:20:29 2007 @@ -1,64 +1,64 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.security; - - -/** - * A helper object used to configure simple authentiaction plugin - * - * @org.apache.xbean.XBean - * - * @version $Revision - */ -public class AuthenticationUser { - - String username; - String password; - String groups; - - - - public AuthenticationUser(String username, String password, String groups) { - this.username = username; - this.password = password; - this.groups = groups; - } - - - public String getGroups() { - return groups; - } - public void setGroups(String groups) { - this.groups = groups; - } - public String getPassword() { - return password; - } - public void setPassword(String password) { - this.password = password; - } - public String getUsername() { - return username; - } - public void setUsername(String username) { - this.username = username; - } - - - -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.security; + + +/** + * A helper object used to configure simple authentiaction plugin + * + * @org.apache.xbean.XBean + * + * @version $Revision + */ +public class AuthenticationUser { + + String username; + String password; + String groups; + + + + public AuthenticationUser(String username, String password, String groups) { + this.username = username; + this.password = password; + this.groups = groups; + } + + + public String getGroups() { + return groups; + } + public void setGroups(String groups) { + this.groups = groups; + } + public String getPassword() { + return password; + } + public void setPassword(String password) { + this.password = password; + } + public String getUsername() { + return username; + } + public void setUsername(String username) { + this.username = username; + } + + + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java Thu Mar 8 06:20:29 2007 @@ -1,45 +1,45 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.security; - -import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.jaas.GroupPrincipal; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.StringTokenizer; - -/** - * Represents an entry in a {@link DefaultAuthorizationMap} for assigning - * different operations (read, write, admin) of user roles to - * a temporary destination - * - * @org.apache.xbean.XBean - * - * @version $Revision: 426366 $ - */ -public class TempDestinationAuthorizationEntry extends AuthorizationEntry { - - - public void afterPropertiesSet() throws Exception { - //we don't need to check if destination is specified since - //the TempDestinationAuthorizationEntry should map to all temp destinations - } - -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.security; + +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.jaas.GroupPrincipal; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.StringTokenizer; + +/** + * Represents an entry in a {@link DefaultAuthorizationMap} for assigning + * different operations (read, write, admin) of user roles to + * a temporary destination + * + * @org.apache.xbean.XBean + * + * @version $Revision: 426366 $ + */ +public class TempDestinationAuthorizationEntry extends AuthorizationEntry { + + + public void afterPropertiesSet() throws Exception { + //we don't need to check if destination is specified since + //the TempDestinationAuthorizationEntry should map to all temp destinations + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Thu Mar 8 06:20:29 2007 @@ -1,82 +1,82 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store; - -import java.io.IOException; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.MessageId; - -/** - * Represents a message store which is used by the persistent - * implementations - * - * @version $Revision: 1.5 $ - */ -public interface ReferenceStore extends MessageStore { - - public class ReferenceData { - long expiration; - int fileId; - int offset; - - public long getExpiration() { - return expiration; - } - public void setExpiration(long expiration) { - this.expiration = expiration; - } - public int getFileId() { - return fileId; - } - public void setFileId(int file) { - this.fileId = file; - } - public int getOffset() { - return offset; - } - public void setOffset(int offset) { - this.offset = offset; - } - - @Override - public String toString() { - return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration; - } - } - - /** - * Adds a message reference to the message store - */ - public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException; - - /** - * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill - * in the missing key if its easy to do so. - */ - public ReferenceData getMessageReference(MessageId identity) throws IOException; - - /** - * @return true if it supports external batch control - */ - public boolean supportsExternalBatchControl(); - - public void setBatch(MessageId startAfter); - -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store; + +import java.io.IOException; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.MessageId; + +/** + * Represents a message store which is used by the persistent + * implementations + * + * @version $Revision: 1.5 $ + */ +public interface ReferenceStore extends MessageStore { + + public class ReferenceData { + long expiration; + int fileId; + int offset; + + public long getExpiration() { + return expiration; + } + public void setExpiration(long expiration) { + this.expiration = expiration; + } + public int getFileId() { + return fileId; + } + public void setFileId(int file) { + this.fileId = file; + } + public int getOffset() { + return offset; + } + public void setOffset(int offset) { + this.offset = offset; + } + + @Override + public String toString() { + return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration; + } + } + + /** + * Adds a message reference to the message store + */ + public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException; + + /** + * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill + * in the missing key if its easy to do so. + */ + public ReferenceData getMessageReference(MessageId identity) throws IOException; + + /** + * @return true if it supports external batch control + */ + public boolean supportsExternalBatchControl(); + + public void setBatch(MessageId startAfter); + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Thu Mar 8 06:20:29 2007 @@ -1,45 +1,45 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store; - -import java.io.IOException; -import java.util.Set; - -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; - -/** - * Adapter to the actual persistence mechanism used with ActiveMQ - * - * @version $Revision: 1.3 $ - */ -public interface ReferenceStoreAdapter extends PersistenceAdapter { - - /** - * Factory method to create a new queue message store with the given destination name - */ - public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException; - - /** - * Factory method to create a new topic message store with the given destination name - */ - public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException; - - public Set getReferenceFileIdsInUse() throws IOException; - -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import java.io.IOException; +import java.util.Set; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * Adapter to the actual persistence mechanism used with ActiveMQ + * + * @version $Revision: 1.3 $ + */ +public interface ReferenceStoreAdapter extends PersistenceAdapter { + + /** + * Factory method to create a new queue message store with the given destination name + */ + public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException; + + /** + * Factory method to create a new topic message store with the given destination name + */ + public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException; + + public Set getReferenceFileIdsInUse() throws IOException; + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Thu Mar 8 06:20:29 2007 @@ -1,137 +1,137 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store; - -import java.io.IOException; - -import javax.jms.JMSException; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.SubscriptionInfo; - -/** - * A MessageStore for durable topic subscriptions - * - * @version $Revision: 1.4 $ - */ -public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore { - /** - * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching - * messages from the last checkpoint - * - * @param context - * @param clientId - * @param subscriptionName - * @param messageId - * @param subscriptionPersistentId - * @throws IOException - */ - public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId) - throws IOException; - - /** - * @param clientId - * @param subscriptionName - * @param sub - * @throws IOException - * @throws JMSException - */ - public void deleteSubscription(String clientId,String subscriptionName) throws IOException; - - /** - * For the new subscription find the last acknowledged message ID and then find any new messages since then and - * dispatch them to the subscription.

e.g. if we dispatched some messages to a new durable topic subscriber, - * then went down before acknowledging any messages, we need to know the correct point from which to recover from. - * - * @param clientId - * @param subscriptionName - * @param listener - * @param subscription - * - * @throws Exception - */ - public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) - throws Exception; - - /** - * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId - * messageId

- * - * @param clientId - * @param subscriptionName - * @param maxReturned - * @param listener - * - * @throws Exception - */ - public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, - MessageRecoveryListener listener) throws Exception; - - /** - * A hint to the Store to reset any batching state for a durable subsriber - * @param clientId - * @param subscriptionName - * - */ - public void resetBatching(String clientId,String subscriptionName); - - - /** - * Get the number of messages ready to deliver from the store to a durable subscriber - * @param clientId - * @param subscriberName - * @return the outstanding message count - * @throws IOException - */ - public int getMessageCount(String clientId,String subscriberName) throws IOException; - - /** - * Finds the subscriber entry for the given consumer info - * - * @param clientId - * @param subscriptionName - * @return the SubscriptionInfo - * @throws IOException - */ - public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException; - - /** - * Lists all the durable subscirptions for a given destination. - * - * @return an array SubscriptionInfos - * @throws IOException - */ - public SubscriptionInfo[] getAllSubscriptions() throws IOException; - - /** - * Inserts the subscriber info due to a subscription change

If this is a new subscription and the retroactive - * is false, then the last message sent to the topic should be set as the last message acknowledged by they new - * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged - * message so that on recovery, all message recorded for the topic get replayed. - * - * @param clientId - * @param subscriptionName - * @param selector - * @param retroactive - * @throws IOException - * - */ - public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) - throws IOException; -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; + +/** + * A MessageStore for durable topic subscriptions + * + * @version $Revision: 1.4 $ + */ +public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore { + /** + * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching + * messages from the last checkpoint + * + * @param context + * @param clientId + * @param subscriptionName + * @param messageId + * @param subscriptionPersistentId + * @throws IOException + */ + public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId) + throws IOException; + + /** + * @param clientId + * @param subscriptionName + * @param sub + * @throws IOException + * @throws JMSException + */ + public void deleteSubscription(String clientId,String subscriptionName) throws IOException; + + /** + * For the new subscription find the last acknowledged message ID and then find any new messages since then and + * dispatch them to the subscription.

e.g. if we dispatched some messages to a new durable topic subscriber, + * then went down before acknowledging any messages, we need to know the correct point from which to recover from. + * + * @param clientId + * @param subscriptionName + * @param listener + * @param subscription + * + * @throws Exception + */ + public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) + throws Exception; + + /** + * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId + * messageId

+ * + * @param clientId + * @param subscriptionName + * @param maxReturned + * @param listener + * + * @throws Exception + */ + public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, + MessageRecoveryListener listener) throws Exception; + + /** + * A hint to the Store to reset any batching state for a durable subsriber + * @param clientId + * @param subscriptionName + * + */ + public void resetBatching(String clientId,String subscriptionName); + + + /** + * Get the number of messages ready to deliver from the store to a durable subscriber + * @param clientId + * @param subscriberName + * @return the outstanding message count + * @throws IOException + */ + public int getMessageCount(String clientId,String subscriberName) throws IOException; + + /** + * Finds the subscriber entry for the given consumer info + * + * @param clientId + * @param subscriptionName + * @return the SubscriptionInfo + * @throws IOException + */ + public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException; + + /** + * Lists all the durable subscirptions for a given destination. + * + * @return an array SubscriptionInfos + * @throws IOException + */ + public SubscriptionInfo[] getAllSubscriptions() throws IOException; + + /** + * Inserts the subscriber info due to a subscription change

If this is a new subscription and the retroactive + * is false, then the last message sent to the topic should be set as the last message acknowledged by they new + * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged + * message so that on recovery, all message recorded for the topic get replayed. + * + * @param clientId + * @param subscriptionName + * @param selector + * @param retroactive + * @throws IOException + * + */ + public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + throws IOException; +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Thu Mar 8 06:20:29 2007 @@ -1,40 +1,40 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadaptor; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.activemq.kaha.Marshaller; - - -/** - * Marshall an Integer - * @version $Revision: 1.10 $ - */ -public class IntegerMarshaller implements Marshaller { - - public void writePayload(Integer object,DataOutput dataOut) throws IOException{ - dataOut.writeInt(object.intValue()); - } - - public Integer readPayload(DataInput dataIn) throws IOException{ - return dataIn.readInt(); - } -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadaptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.activemq.kaha.Marshaller; + + +/** + * Marshall an Integer + * @version $Revision: 1.10 $ + */ +public class IntegerMarshaller implements Marshaller { + + public void writePayload(Integer object,DataOutput dataOut) throws IOException{ + dataOut.writeInt(object.intValue()); + } + + public Integer readPayload(DataInput dataIn) throws IOException{ + return dataIn.readInt(); + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Thu Mar 8 06:20:29 2007 @@ -1,183 +1,183 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.activemq.store.kahadaptor; - -import java.io.IOException; -import java.util.Set; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.kaha.MapContainer; -import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.memory.UsageManager; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.ReferenceStore; - -public class KahaReferenceStore implements ReferenceStore{ - - protected final ActiveMQDestination destination; - protected final MapContainer messageContainer; - protected KahaReferenceStoreAdapter adapter; - private StoreEntry batchEntry=null; - - public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{ - this.adapter = adapter; - this.messageContainer=container; - this.destination=destination; - } - - public void start(){ - } - - public void stop(){ - } - - protected MessageId getMessageId(Object object){ - return new MessageId(((ReferenceRecord)object).getMessageId()); - } - - public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ - throw new RuntimeException("Use addMessageReference instead"); - } - - public synchronized Message getMessage(MessageId identity) throws IOException{ - throw new RuntimeException("Use addMessageReference instead"); - } - - protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ - ReferenceRecord record=(ReferenceRecord)msg; - listener.recoverMessageReference(new MessageId(record.getMessageId())); - } - - public synchronized void recover(MessageRecoveryListener listener) throws Exception{ - for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ - ReferenceRecord record=messageContainer.getValue(entry); - recover(listener,new MessageId(record.getMessageId())); - } - listener.finished(); - } - - public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ - StoreEntry entry=batchEntry; - if(entry==null){ - entry=messageContainer.getFirst(); - }else{ - entry=messageContainer.refresh(entry); - if (entry != null) { - entry=messageContainer.getNext(entry); - } - } - if(entry!=null){ - int count=0; - do{ - Object msg=messageContainer.getValue(entry); - if(msg!=null){ - recover(listener,msg); - count++; - } - batchEntry=entry; - entry=messageContainer.getNext(entry); - }while(entry!=null&&count messageContainer; + protected KahaReferenceStoreAdapter adapter; + private StoreEntry batchEntry=null; + + public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{ + this.adapter = adapter; + this.messageContainer=container; + this.destination=destination; + } + + public void start(){ + } + + public void stop(){ + } + + protected MessageId getMessageId(Object object){ + return new MessageId(((ReferenceRecord)object).getMessageId()); + } + + public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ + throw new RuntimeException("Use addMessageReference instead"); + } + + public synchronized Message getMessage(MessageId identity) throws IOException{ + throw new RuntimeException("Use addMessageReference instead"); + } + + protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{ + ReferenceRecord record=(ReferenceRecord)msg; + listener.recoverMessageReference(new MessageId(record.getMessageId())); + } + + public synchronized void recover(MessageRecoveryListener listener) throws Exception{ + for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ + ReferenceRecord record=messageContainer.getValue(entry); + recover(listener,new MessageId(record.getMessageId())); + } + listener.finished(); + } + + public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + StoreEntry entry=batchEntry; + if(entry==null){ + entry=messageContainer.getFirst(); + }else{ + entry=messageContainer.refresh(entry); + if (entry != null) { + entry=messageContainer.getNext(entry); + } + } + if(entry!=null){ + int count=0; + do{ + Object msg=messageContainer.getValue(entry); + if(msg!=null){ + recover(listener,msg); + count++; + } + batchEntry=entry; + entry=messageContainer.getNext(entry); + }while(entry!=null&&countrecordReferences = new HashMap(); - private boolean storeValid; - - - public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ - throw new RuntimeException("Use createQueueReferenceStore instead"); - } - - public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ - throw new RuntimeException("Use createTopicReferenceStore instead"); - } - - @Override - public void start() throws Exception{ - super.start(); - Store store=getStore(); - boolean empty=store.getMapContainerIds().isEmpty(); - stateMap=store.getMapContainer("state",STORE_STATE); - stateMap.load(); - if(!empty){ - - AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE); - if(status!=null){ - storeValid=status.get(); - } - - if(storeValid){ - if(stateMap.containsKey(RECORD_REFERENCES)){ - recordReferences=(Map)stateMap.get(RECORD_REFERENCES); - } - }else { - /* - log.warn("Store Not shutdown cleanly - clearing out unsafe records ..."); - Set set = store.getListContainerIds(); - for (ContainerId cid:set) { - if (!cid.getDataContainerName().equals(STORE_STATE)) { - store.deleteListContainer(cid); - } - } - set = store.getMapContainerIds(); - for (ContainerId cid:set) { - if (!cid.getDataContainerName().equals(STORE_STATE)) { - store.deleteMapContainer(cid); - } - } - */ - buildReferenceFileIdsInUse(); - } - - } - stateMap.put(STORE_STATE,new AtomicBoolean()); - } - - @Override - public void stop() throws Exception { - stateMap.put(RECORD_REFERENCES,recordReferences); - stateMap.put(STORE_STATE,new AtomicBoolean(true)); - super.stop(); - } - - - public boolean isStoreValid() { - return storeValid; - } - - - public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { - ReferenceStore rc=(ReferenceStore)queues.get(destination); - if(rc==null){ - rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination); - messageStores.put(destination,rc); -// if(transactionStore!=null){ -// rc=transactionStore.proxy(rc); -// } - queues.put(destination,rc); - } - return rc; - } - - public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { - TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination); - if(rc==null){ - Store store=getStore(); - MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data"); - MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob"); - ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks"); - ackContainer.setMarshaller(new TopicSubAckMarshaller()); - rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination); - messageStores.put(destination,rc); -// if(transactionStore!=null){ -// rc=transactionStore.proxy(rc); -// } - topics.put(destination,rc); - } - return rc; - } - - public void buildReferenceFileIdsInUse() throws IOException { - - recordReferences = new HashMap(); - - Set destinations = getDestinations(); - for (ActiveMQDestination destination : destinations) { - if( destination.isQueue() ) { - KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination); - store.addReferenceFileIdsInUse(); - } else { - KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination); - store.addReferenceFileIdsInUse(); - } - } - } - - - protected MapContainer getMapReferenceContainer(Object id,String containerName) throws IOException{ - Store store=getStore(); - MapContainer container=store.getMapContainer(id,containerName); - container.setKeyMarshaller(new MessageIdMarshaller()); - container.setValueMarshaller(new ReferenceRecordMarshaller()); - container.load(); - return container; - } - - synchronized void addInterestInRecordFile(int recordNumber) { - Integer key = new Integer(recordNumber); - AtomicInteger rr = recordReferences.get(key); - if (rr == null) { - rr = new AtomicInteger(); - recordReferences.put(key,rr); - } - rr.incrementAndGet(); - } - - synchronized void removeInterestInRecordFile(int recordNumber) { - Integer key = new Integer(recordNumber); - AtomicInteger rr = recordReferences.get(key); - if (rr != null && rr.decrementAndGet() <= 0) { - recordReferences.remove(key); - } - } - - /** - * @return - * @throws IOException - * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() - */ - public Set getReferenceFileIdsInUse() throws IOException{ - return recordReferences.keySet(); - } - - - -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadaptor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.kaha.ListContainer; +import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.MessageIdMarshaller; +import org.apache.activemq.kaha.Store; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.ReferenceStore; +import org.apache.activemq.store.ReferenceStoreAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TopicReferenceStore; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter { + private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class); + private static final String STORE_STATE = "store-state"; + private static final String RECORD_REFERENCES = "record-references"; + private MapContainer stateMap; + private MaprecordReferences = new HashMap(); + private boolean storeValid; + + + public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ + throw new RuntimeException("Use createQueueReferenceStore instead"); + } + + public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ + throw new RuntimeException("Use createTopicReferenceStore instead"); + } + + @Override + public void start() throws Exception{ + super.start(); + Store store=getStore(); + boolean empty=store.getMapContainerIds().isEmpty(); + stateMap=store.getMapContainer("state",STORE_STATE); + stateMap.load(); + if(!empty){ + + AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE); + if(status!=null){ + storeValid=status.get(); + } + + if(storeValid){ + if(stateMap.containsKey(RECORD_REFERENCES)){ + recordReferences=(Map)stateMap.get(RECORD_REFERENCES); + } + }else { + /* + log.warn("Store Not shutdown cleanly - clearing out unsafe records ..."); + Set set = store.getListContainerIds(); + for (ContainerId cid:set) { + if (!cid.getDataContainerName().equals(STORE_STATE)) { + store.deleteListContainer(cid); + } + } + set = store.getMapContainerIds(); + for (ContainerId cid:set) { + if (!cid.getDataContainerName().equals(STORE_STATE)) { + store.deleteMapContainer(cid); + } + } + */ + buildReferenceFileIdsInUse(); + } + + } + stateMap.put(STORE_STATE,new AtomicBoolean()); + } + + @Override + public void stop() throws Exception { + stateMap.put(RECORD_REFERENCES,recordReferences); + stateMap.put(STORE_STATE,new AtomicBoolean(true)); + super.stop(); + } + + + public boolean isStoreValid() { + return storeValid; + } + + + public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { + ReferenceStore rc=(ReferenceStore)queues.get(destination); + if(rc==null){ + rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination); + messageStores.put(destination,rc); +// if(transactionStore!=null){ +// rc=transactionStore.proxy(rc); +// } + queues.put(destination,rc); + } + return rc; + } + + public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { + TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination); + if(rc==null){ + Store store=getStore(); + MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data"); + MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob"); + ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks"); + ackContainer.setMarshaller(new TopicSubAckMarshaller()); + rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination); + messageStores.put(destination,rc); +// if(transactionStore!=null){ +// rc=transactionStore.proxy(rc); +// } + topics.put(destination,rc); + } + return rc; + } + + public void buildReferenceFileIdsInUse() throws IOException { + + recordReferences = new HashMap(); + + Set destinations = getDestinations(); + for (ActiveMQDestination destination : destinations) { + if( destination.isQueue() ) { + KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination); + store.addReferenceFileIdsInUse(); + } else { + KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination); + store.addReferenceFileIdsInUse(); + } + } + } + + + protected MapContainer getMapReferenceContainer(Object id,String containerName) throws IOException{ + Store store=getStore(); + MapContainer container=store.getMapContainer(id,containerName); + container.setKeyMarshaller(new MessageIdMarshaller()); + container.setValueMarshaller(new ReferenceRecordMarshaller()); + container.load(); + return container; + } + + synchronized void addInterestInRecordFile(int recordNumber) { + Integer key = new Integer(recordNumber); + AtomicInteger rr = recordReferences.get(key); + if (rr == null) { + rr = new AtomicInteger(); + recordReferences.put(key,rr); + } + rr.incrementAndGet(); + } + + synchronized void removeInterestInRecordFile(int recordNumber) { + Integer key = new Integer(recordNumber); + AtomicInteger rr = recordReferences.get(key); + if (rr != null && rr.decrementAndGet() <= 0) { + recordReferences.remove(key); + } + } + + /** + * @return + * @throws IOException + * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() + */ + public Set getReferenceFileIdsInUse() throws IOException{ + return recordReferences.keySet(); + } + + + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java ------------------------------------------------------------------------------ svn:eol-style = native