activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r516048 [9/14] - in /activemq/trunk: activemq-book/src/docbkx/ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/blob/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ act...
Date Thu, 08 Mar 2007 14:20:38 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java Thu Mar  8 06:20:29 2007
@@ -1 +1,138 @@
-/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.openwire.v3;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.activemq.openwire.*;
import org.apache.activ
 emq.command.*;



/**
 * Marshalling code for Open Wire Format for XATransactionIdMarshaller
 *
 *
 * NOTE!: This file is auto generated - do not modify!
 *        if you need to make a change, please see the modify the groovy scripts in the
 *        under src/gram/script and then use maven openwire:generate to regenerate 
 *        this file.
 *
 * @version $Revision$
 */
public class XATransactionIdMarshaller extends TransactionIdMarshaller {

    /**
     * Return the type of Data Structure we marshal
     * @return short representation of the type data structure
     */
    public byte getDataStructureType() {
        return XATransactionId.DATA_STRUCTURE_TYPE;
    }
    
    /**
     * @return a new object instance
     */
    public DataStructure createObject() {
        return new XATransactionId();
    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream 
 to build the object from
     * @throws IOException
     */
    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
        super.tightUnmarshal(wireFormat, o, dataIn, bs);

        XATransactionId info = (XATransactionId)o;
        info.setFormatId(dataIn.readInt());
        info.setGlobalTransactionId(tightUnmarshalByteArray(dataIn, bs));
        info.setBranchQualifier(tightUnmarshalByteArray(dataIn, bs));

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {

        XATransactionId info = (XATransactionId)o;

        int rc = super.tightMarshal1(wireFormat, o, bs);
        rc += tightMarshalByteArray1(info.getGlobalTransactionId(), bs);
        rc += tightMarshalByteArray1(info.getBranchQualifier(), bs);

        return rc + 4;
    }

    /**
     * Write a obje
 ct instance to data output stream
     *
     * @param o the instance to be marshaled
     * @param dataOut the output stream
     * @throws IOException thrown if an error occurs
     */
    public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
        super.tightMarshal2(wireFormat, o, dataOut, bs);

        XATransactionId info = (XATransactionId)o;
        dataOut.writeInt(info.getFormatId());
        tightMarshalByteArray2(info.getGlobalTransactionId(), dataOut, bs);
        tightMarshalByteArray2(info.getBranchQualifier(), dataOut, bs);

    }

    /**
     * Un-marshal an object instance from the data input stream
     *
     * @param o the object to un-marshal
     * @param dataIn the data input stream to build the object from
     * @throws IOException
     */
    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
        super.looseUnmarshal(wireFo
 rmat, o, dataIn);

        XATransactionId info = (XATransactionId)o;
        info.setFormatId(dataIn.readInt());
        info.setGlobalTransactionId(looseUnmarshalByteArray(dataIn));
        info.setBranchQualifier(looseUnmarshalByteArray(dataIn));

    }


    /**
     * Write the booleans that this object uses to a BooleanStream
     */
    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException {

        XATransactionId info = (XATransactionId)o;

        super.looseMarshal(wireFormat, o, dataOut);
        dataOut.writeInt(info.getFormatId());
        looseMarshalByteArray(wireFormat, info.getGlobalTransactionId(), dataOut);
        looseMarshalByteArray(wireFormat, info.getBranchQualifier(), dataOut);

    }
}
\ No newline at end of file
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.openwire.v3;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.*;
+import org.apache.activemq.command.*;
+
+
+
+/**
+ * Marshalling code for Open Wire Format for XATransactionIdMarshaller
+ *
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ *        if you need to make a change, please see the modify the groovy scripts in the
+ *        under src/gram/script and then use maven openwire:generate to regenerate 
+ *        this file.
+ *
+ * @version $Revision$
+ */
+public class XATransactionIdMarshaller extends TransactionIdMarshaller {
+
+    /**
+     * Return the type of Data Structure we marshal
+     * @return short representation of the type data structure
+     */
+    public byte getDataStructureType() {
+        return XATransactionId.DATA_STRUCTURE_TYPE;
+    }
+    
+    /**
+     * @return a new object instance
+     */
+    public DataStructure createObject() {
+        return new XATransactionId();
+    }
+
+    /**
+     * Un-marshal an object instance from the data input stream
+     *
+     * @param o the object to un-marshal
+     * @param dataIn the data input stream to build the object from
+     * @throws IOException
+     */
+    public void tightUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn, BooleanStream bs) throws IOException {
+        super.tightUnmarshal(wireFormat, o, dataIn, bs);
+
+        XATransactionId info = (XATransactionId)o;
+        info.setFormatId(dataIn.readInt());
+        info.setGlobalTransactionId(tightUnmarshalByteArray(dataIn, bs));
+        info.setBranchQualifier(tightUnmarshalByteArray(dataIn, bs));
+
+    }
+
+
+    /**
+     * Write the booleans that this object uses to a BooleanStream
+     */
+    public int tightMarshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
+
+        XATransactionId info = (XATransactionId)o;
+
+        int rc = super.tightMarshal1(wireFormat, o, bs);
+        rc += tightMarshalByteArray1(info.getGlobalTransactionId(), bs);
+        rc += tightMarshalByteArray1(info.getBranchQualifier(), bs);
+
+        return rc + 4;
+    }
+
+    /**
+     * Write a object instance to data output stream
+     *
+     * @param o the instance to be marshaled
+     * @param dataOut the output stream
+     * @throws IOException thrown if an error occurs
+     */
+    public void tightMarshal2(OpenWireFormat wireFormat, Object o, DataOutput dataOut, BooleanStream bs) throws IOException {
+        super.tightMarshal2(wireFormat, o, dataOut, bs);
+
+        XATransactionId info = (XATransactionId)o;
+        dataOut.writeInt(info.getFormatId());
+        tightMarshalByteArray2(info.getGlobalTransactionId(), dataOut, bs);
+        tightMarshalByteArray2(info.getBranchQualifier(), dataOut, bs);
+
+    }
+
+    /**
+     * Un-marshal an object instance from the data input stream
+     *
+     * @param o the object to un-marshal
+     * @param dataIn the data input stream to build the object from
+     * @throws IOException
+     */
+    public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException {
+        super.looseUnmarshal(wireFormat, o, dataIn);
+
+        XATransactionId info = (XATransactionId)o;
+        info.setFormatId(dataIn.readInt());
+        info.setGlobalTransactionId(looseUnmarshalByteArray(dataIn));
+        info.setBranchQualifier(looseUnmarshalByteArray(dataIn));
+
+    }
+
+
+    /**
+     * Write the booleans that this object uses to a BooleanStream
+     */
+    public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut) throws IOException {
+
+        XATransactionId info = (XATransactionId)o;
+
+        super.looseMarshal(wireFormat, o, dataOut);
+        dataOut.writeInt(info.getFormatId());
+        looseMarshalByteArray(wireFormat, info.getGlobalTransactionId(), dataOut);
+        looseMarshalByteArray(wireFormat, info.getBranchQualifier(), dataOut);
+
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/XATransactionIdMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java Thu Mar  8 06:20:29 2007
@@ -1,64 +1,64 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.security;
-
-
-/**
- * A helper object used to configure simple authentiaction plugin
- *  
- * @org.apache.xbean.XBean
- * 
- * @version $Revision
- */
-public class AuthenticationUser {
-
-	String username;
-	String password;
-	String groups;
-	
-	
-	
-	public AuthenticationUser(String username, String password, String groups) {
-		this.username = username;
-		this.password = password;
-		this.groups = groups;
-	}
-	
-	
-	public String getGroups() {
-		return groups;
-	}
-	public void setGroups(String groups) {
-		this.groups = groups;
-	}
-	public String getPassword() {
-		return password;
-	}
-	public void setPassword(String password) {
-		this.password = password;
-	}
-	public String getUsername() {
-		return username;
-	}
-	public void setUsername(String username) {
-		this.username = username;
-	}
-	
-	
-	
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.security;
+
+
+/**
+ * A helper object used to configure simple authentiaction plugin
+ *  
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision
+ */
+public class AuthenticationUser {
+
+	String username;
+	String password;
+	String groups;
+	
+	
+	
+	public AuthenticationUser(String username, String password, String groups) {
+		this.username = username;
+		this.password = password;
+		this.groups = groups;
+	}
+	
+	
+	public String getGroups() {
+		return groups;
+	}
+	public void setGroups(String groups) {
+		this.groups = groups;
+	}
+	public String getPassword() {
+		return password;
+	}
+	public void setPassword(String password) {
+		this.password = password;
+	}
+	public String getUsername() {
+		return username;
+	}
+	public void setUsername(String username) {
+		this.username = username;
+	}
+	
+	
+	
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthenticationUser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java Thu Mar  8 06:20:29 2007
@@ -1,45 +1,45 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.security;
-
-import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.jaas.GroupPrincipal;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-/**
- * Represents an entry in a {@link DefaultAuthorizationMap} for assigning
- * different operations (read, write, admin) of user roles to
- * a temporary destination
- * 
- * @org.apache.xbean.XBean
- * 
- * @version $Revision: 426366 $
- */
-public class TempDestinationAuthorizationEntry extends AuthorizationEntry {
-
-  
-    public void afterPropertiesSet() throws Exception {
-       //we don't need to check if destination is specified since
-       //the TempDestinationAuthorizationEntry  should map to all temp destinations	
-    }    
-
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.security;
+
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.jaas.GroupPrincipal;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+/**
+ * Represents an entry in a {@link DefaultAuthorizationMap} for assigning
+ * different operations (read, write, admin) of user roles to
+ * a temporary destination
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 426366 $
+ */
+public class TempDestinationAuthorizationEntry extends AuthorizationEntry {
+
+  
+    public void afterPropertiesSet() throws Exception {
+       //we don't need to check if destination is specified since
+       //the TempDestinationAuthorizationEntry  should map to all temp destinations	
+    }    
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/TempDestinationAuthorizationEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Thu Mar  8 06:20:29 2007
@@ -1,82 +1,82 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store;
-
-import java.io.IOException;
-
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.MessageId;
-
-/**
- * Represents a message store which is used by the persistent 
- * implementations
- * 
- * @version $Revision: 1.5 $
- */
-public interface ReferenceStore extends MessageStore {
-
-	public class ReferenceData {
-		long expiration;
-		int fileId;
-		int offset;
-		
-		public long getExpiration() {
-			return expiration;
-		}
-		public void setExpiration(long expiration) {
-			this.expiration = expiration;
-		}
-		public int getFileId() {
-			return fileId;
-		}
-		public void setFileId(int file) {
-			this.fileId = file;
-		}
-		public int getOffset() {
-			return offset;
-		}
-		public void setOffset(int offset) {
-			this.offset = offset;
-		}
-		
-		@Override
-		public String toString() {
-			return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration;
-		}
-	}
-	
-    /**
-     * Adds a message reference to the message store
-     */
-    public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
-
-    /**
-     * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
-     * in the missing key if its easy to do so.
-     */
-    public ReferenceData getMessageReference(MessageId identity) throws IOException;
-    
-    /**
-     * @return true if it supports external batch control
-     */
-    public boolean supportsExternalBatchControl();
-    
-    public void setBatch(MessageId startAfter);
-    
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Represents a message store which is used by the persistent 
+ * implementations
+ * 
+ * @version $Revision: 1.5 $
+ */
+public interface ReferenceStore extends MessageStore {
+
+	public class ReferenceData {
+		long expiration;
+		int fileId;
+		int offset;
+		
+		public long getExpiration() {
+			return expiration;
+		}
+		public void setExpiration(long expiration) {
+			this.expiration = expiration;
+		}
+		public int getFileId() {
+			return fileId;
+		}
+		public void setFileId(int file) {
+			this.fileId = file;
+		}
+		public int getOffset() {
+			return offset;
+		}
+		public void setOffset(int offset) {
+			this.offset = offset;
+		}
+		
+		@Override
+		public String toString() {
+			return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration;
+		}
+	}
+	
+    /**
+     * Adds a message reference to the message store
+     */
+    public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
+
+    /**
+     * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+     * in the missing key if its easy to do so.
+     */
+    public ReferenceData getMessageReference(MessageId identity) throws IOException;
+    
+    /**
+     * @return true if it supports external batch control
+     */
+    public boolean supportsExternalBatchControl();
+    
+    public void setBatch(MessageId startAfter);
+    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Thu Mar  8 06:20:29 2007
@@ -1,45 +1,45 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- * Adapter to the actual persistence mechanism used with ActiveMQ
- *
- * @version $Revision: 1.3 $
- */
-public interface ReferenceStoreAdapter extends PersistenceAdapter {
-
-    /**
-     * Factory method to create a new queue message store with the given destination name
-     */
-    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
-
-    /**
-     * Factory method to create a new topic message store with the given destination name
-     */
-    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
-
-	public Set<Integer> getReferenceFileIdsInUse() throws IOException;
-
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * Adapter to the actual persistence mechanism used with ActiveMQ
+ *
+ * @version $Revision: 1.3 $
+ */
+public interface ReferenceStoreAdapter extends PersistenceAdapter {
+
+    /**
+     * Factory method to create a new queue message store with the given destination name
+     */
+    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
+
+    /**
+     * Factory method to create a new topic message store with the given destination name
+     */
+    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
+
+	public Set<Integer> getReferenceFileIdsInUse() throws IOException;
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Thu Mar  8 06:20:29 2007
@@ -1,137 +1,137 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store;
-
-import java.io.IOException;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.SubscriptionInfo;
-
-/**
- * A MessageStore for durable topic subscriptions
- * 
- * @version $Revision: 1.4 $
- */
-public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
-    /**
-     * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
-     * messages from the last checkpoint
-     * 
-     * @param context
-     * @param clientId
-     * @param subscriptionName
-     * @param messageId
-     * @param subscriptionPersistentId
-     * @throws IOException
-     */
-    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
-                    throws IOException;
-
-    /**
-     * @param clientId
-     * @param subscriptionName
-     * @param sub
-     * @throws IOException
-     * @throws JMSException
-     */
-    public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
-
-    /**
-     * For the new subscription find the last acknowledged message ID and then find any new messages since then and
-     * dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
-     * then went down before acknowledging any messages, we need to know the correct point from which to recover from.
-     * 
-     * @param clientId
-     * @param subscriptionName
-     * @param listener
-     * @param subscription
-     * 
-     * @throws Exception
-     */
-    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-                    throws Exception;
-
-    /**
-     * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
-     * messageId <p/>
-     * 
-     * @param clientId
-     * @param subscriptionName
-     * @param maxReturned
-     * @param listener
-     * 
-     * @throws Exception
-     */
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
-                    MessageRecoveryListener listener) throws Exception;
-
-    /**
-     * A hint to the Store to reset any batching state for a durable subsriber
-     * @param clientId 
-     * @param subscriptionName 
-     *
-     */
-    public void resetBatching(String clientId,String subscriptionName);
-    
-    
-    /**
-     * Get the number of messages ready to deliver from the store to a durable subscriber
-     * @param clientId
-     * @param subscriberName
-     * @return the outstanding message count
-     * @throws IOException
-     */
-    public int getMessageCount(String clientId,String subscriberName) throws IOException;
-    
-    /**
-     * Finds the subscriber entry for the given consumer info
-     * 
-     * @param clientId
-     * @param subscriptionName
-     * @return the SubscriptionInfo
-     * @throws IOException
-     */
-    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
-
-    /**
-     * Lists all the durable subscirptions for a given destination.
-     * 
-     * @return an array SubscriptionInfos
-     * @throws IOException
-     */
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException;
-
-    /**
-     * Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
-     * is false, then the last message sent to the topic should be set as the last message acknowledged by they new
-     * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
-     * message so that on recovery, all message recorded for the topic get replayed.
-     * 
-     * @param clientId
-     * @param subscriptionName
-     * @param selector
-     * @param retroactive
-     * @throws IOException
-     * 
-     */
-    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
-                    throws IOException;	
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+
+/**
+ * A MessageStore for durable topic subscriptions
+ * 
+ * @version $Revision: 1.4 $
+ */
+public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
+    /**
+     * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
+     * messages from the last checkpoint
+     * 
+     * @param context
+     * @param clientId
+     * @param subscriptionName
+     * @param messageId
+     * @param subscriptionPersistentId
+     * @throws IOException
+     */
+    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+                    throws IOException;
+
+    /**
+     * @param clientId
+     * @param subscriptionName
+     * @param sub
+     * @throws IOException
+     * @throws JMSException
+     */
+    public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
+
+    /**
+     * For the new subscription find the last acknowledged message ID and then find any new messages since then and
+     * dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
+     * then went down before acknowledging any messages, we need to know the correct point from which to recover from.
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param listener
+     * @param subscription
+     * 
+     * @throws Exception
+     */
+    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+                    throws Exception;
+
+    /**
+     * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
+     * messageId <p/>
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param maxReturned
+     * @param listener
+     * 
+     * @throws Exception
+     */
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+                    MessageRecoveryListener listener) throws Exception;
+
+    /**
+     * A hint to the Store to reset any batching state for a durable subsriber
+     * @param clientId 
+     * @param subscriptionName 
+     *
+     */
+    public void resetBatching(String clientId,String subscriptionName);
+    
+    
+    /**
+     * Get the number of messages ready to deliver from the store to a durable subscriber
+     * @param clientId
+     * @param subscriberName
+     * @return the outstanding message count
+     * @throws IOException
+     */
+    public int getMessageCount(String clientId,String subscriberName) throws IOException;
+    
+    /**
+     * Finds the subscriber entry for the given consumer info
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @return the SubscriptionInfo
+     * @throws IOException
+     */
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
+
+    /**
+     * Lists all the durable subscirptions for a given destination.
+     * 
+     * @return an array SubscriptionInfos
+     * @throws IOException
+     */
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+
+    /**
+     * Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
+     * is false, then the last message sent to the topic should be set as the last message acknowledged by they new
+     * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
+     * message so that on recovery, all message recorded for the topic get replayed.
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param selector
+     * @param retroactive
+     * @throws IOException
+     * 
+     */
+    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+                    throws IOException;	
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Thu Mar  8 06:20:29 2007
@@ -1,40 +1,40 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadaptor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.activemq.kaha.Marshaller;
-
-
-/**
- * Marshall an Integer
- * @version $Revision: 1.10 $
- */
-public class IntegerMarshaller implements Marshaller<Integer> {
-   
-    public void writePayload(Integer object,DataOutput dataOut) throws IOException{
-       dataOut.writeInt(object.intValue());
-    }
-
-    public Integer readPayload(DataInput dataIn) throws IOException{
-        return dataIn.readInt();
-    }
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+
+
+/**
+ * Marshall an Integer
+ * @version $Revision: 1.10 $
+ */
+public class IntegerMarshaller implements Marshaller<Integer> {
+   
+    public void writePayload(Integer object,DataOutput dataOut) throws IOException{
+       dataOut.writeInt(object.intValue());
+    }
+
+    public Integer readPayload(DataInput dataIn) throws IOException{
+        return dataIn.readInt();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Thu Mar  8 06:20:29 2007
@@ -1,183 +1,183 @@
-/**
- * 
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.activemq.store.kahadaptor;
-
-import java.io.IOException;
-import java.util.Set;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.ReferenceStore;
-
-public class KahaReferenceStore implements ReferenceStore{
-
-    protected final ActiveMQDestination destination;
-    protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
-    protected KahaReferenceStoreAdapter adapter;
-    private StoreEntry batchEntry=null;
-
-    public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{
-        this.adapter = adapter;
-        this.messageContainer=container;
-        this.destination=destination;
-    }
-
-    public void start(){
-    }
-
-    public void stop(){
-    }
-
-    protected MessageId getMessageId(Object object){
-        return new MessageId(((ReferenceRecord)object).getMessageId());
-    }
-
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        throw new RuntimeException("Use addMessageReference instead");
-    }
-
-    public synchronized Message getMessage(MessageId identity) throws IOException{
-        throw new RuntimeException("Use addMessageReference instead");
-    }
-
-    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
-        ReferenceRecord record=(ReferenceRecord)msg;
-        listener.recoverMessageReference(new MessageId(record.getMessageId()));
-    }
-
-    public synchronized void recover(MessageRecoveryListener listener) throws Exception{
-        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
-            ReferenceRecord record=messageContainer.getValue(entry);
-            recover(listener,new MessageId(record.getMessageId()));
-        }
-        listener.finished();
-    }
-
-    public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
-        StoreEntry entry=batchEntry;
-        if(entry==null){
-            entry=messageContainer.getFirst();
-        }else{
-            entry=messageContainer.refresh(entry);
-            if (entry != null) {
-            entry=messageContainer.getNext(entry);
-            }
-        }
-        if(entry!=null){
-            int count=0;
-            do{
-                Object msg=messageContainer.getValue(entry);
-                if(msg!=null){
-                    recover(listener,msg);
-                    count++;
-                }
-                batchEntry=entry;
-                entry=messageContainer.getNext(entry);
-            }while(entry!=null&&count<maxReturned&&listener.hasSpace());
-        }
-        listener.finished();
-    }
-
-    public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
-            throws IOException{
-        ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
-        messageContainer.put(messageId,record);
-        addInterest(record);
-    }
-
-    public ReferenceData getMessageReference(MessageId identity) throws IOException{
-        ReferenceRecord result=messageContainer.get(identity);
-        if(result==null)
-            return null;
-        return result.getData();
-    }
-
-    public void addReferenceFileIdsInUse(){
-        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
-            ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
-            addInterest(msg);
-        }
-    }
-
-    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
-        removeMessage(ack.getLastMessageId());
-    }
-
-    public synchronized void removeMessage(MessageId msgId) throws IOException{
-        StoreEntry entry=messageContainer.getEntry(msgId);
-        if(entry!=null){
-            ReferenceRecord rr=messageContainer.remove(msgId);
-            if(rr!=null){
-                removeInterest(rr);
-                if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
-                    resetBatching();
-                }
-            }
-        }
-    }
-
-    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
-        messageContainer.clear();
-    }
-
-    public ActiveMQDestination getDestination(){
-        return destination;
-    }
-
-    public synchronized void delete(){
-        messageContainer.clear();
-    }
-
-    public void resetBatching(){
-        batchEntry=null;
-    }
-
-    public int getMessageCount(){
-        return messageContainer.size();
-    }
-
-    public void setUsageManager(UsageManager usageManager){
-    }
-
-    public boolean isSupportForCursors(){
-        return true;
-    }
-
-    
-    public boolean supportsExternalBatchControl(){
-        return true;
-    }
-    
-    void removeInterest(ReferenceRecord rr) {
-        adapter.removeInterestInRecordFile(rr.getData().getFileId());
-    }
-    
-    void addInterest(ReferenceRecord rr) {
-        adapter.addInterestInRecordFile(rr.getData().getFileId());
-    }
-
-    /**
-     * @param startAfter
-     * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
-     */
-    public void setBatch(MessageId startAfter){        
-    }
-}
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.ReferenceStore;
+
+public class KahaReferenceStore implements ReferenceStore{
+
+    protected final ActiveMQDestination destination;
+    protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
+    protected KahaReferenceStoreAdapter adapter;
+    private StoreEntry batchEntry=null;
+
+    public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{
+        this.adapter = adapter;
+        this.messageContainer=container;
+        this.destination=destination;
+    }
+
+    public void start(){
+    }
+
+    public void stop(){
+    }
+
+    protected MessageId getMessageId(Object object){
+        return new MessageId(((ReferenceRecord)object).getMessageId());
+    }
+
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        throw new RuntimeException("Use addMessageReference instead");
+    }
+
+    public synchronized Message getMessage(MessageId identity) throws IOException{
+        throw new RuntimeException("Use addMessageReference instead");
+    }
+
+    protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
+        ReferenceRecord record=(ReferenceRecord)msg;
+        listener.recoverMessageReference(new MessageId(record.getMessageId()));
+    }
+
+    public synchronized void recover(MessageRecoveryListener listener) throws Exception{
+        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
+            ReferenceRecord record=messageContainer.getValue(entry);
+            recover(listener,new MessageId(record.getMessageId()));
+        }
+        listener.finished();
+    }
+
+    public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        StoreEntry entry=batchEntry;
+        if(entry==null){
+            entry=messageContainer.getFirst();
+        }else{
+            entry=messageContainer.refresh(entry);
+            if (entry != null) {
+            entry=messageContainer.getNext(entry);
+            }
+        }
+        if(entry!=null){
+            int count=0;
+            do{
+                Object msg=messageContainer.getValue(entry);
+                if(msg!=null){
+                    recover(listener,msg);
+                    count++;
+                }
+                batchEntry=entry;
+                entry=messageContainer.getNext(entry);
+            }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+        }
+        listener.finished();
+    }
+
+    public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
+            throws IOException{
+        ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
+        messageContainer.put(messageId,record);
+        addInterest(record);
+    }
+
+    public ReferenceData getMessageReference(MessageId identity) throws IOException{
+        ReferenceRecord result=messageContainer.get(identity);
+        if(result==null)
+            return null;
+        return result.getData();
+    }
+
+    public void addReferenceFileIdsInUse(){
+        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
+            ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
+            addInterest(msg);
+        }
+    }
+
+    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
+        removeMessage(ack.getLastMessageId());
+    }
+
+    public synchronized void removeMessage(MessageId msgId) throws IOException{
+        StoreEntry entry=messageContainer.getEntry(msgId);
+        if(entry!=null){
+            ReferenceRecord rr=messageContainer.remove(msgId);
+            if(rr!=null){
+                removeInterest(rr);
+                if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+                    resetBatching();
+                }
+            }
+        }
+    }
+
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
+        messageContainer.clear();
+    }
+
+    public ActiveMQDestination getDestination(){
+        return destination;
+    }
+
+    public synchronized void delete(){
+        messageContainer.clear();
+    }
+
+    public void resetBatching(){
+        batchEntry=null;
+    }
+
+    public int getMessageCount(){
+        return messageContainer.size();
+    }
+
+    public void setUsageManager(UsageManager usageManager){
+    }
+
+    public boolean isSupportForCursors(){
+        return true;
+    }
+
+    
+    public boolean supportsExternalBatchControl(){
+        return true;
+    }
+    
+    void removeInterest(ReferenceRecord rr) {
+        adapter.removeInterestInRecordFile(rr.getData().getFileId());
+    }
+    
+    void addInterest(ReferenceRecord rr) {
+        adapter.addInterestInRecordFile(rr.getData().getFileId());
+    }
+
+    /**
+     * @param startAfter
+     * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
+     */
+    public void setBatch(MessageId startAfter){        
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=516048&r1=516047&r2=516048
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Thu Mar  8 06:20:29 2007
@@ -1,199 +1,199 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadaptor;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.ListContainer;
-import org.apache.activemq.kaha.MapContainer;
-import org.apache.activemq.kaha.MessageIdMarshaller;
-import org.apache.activemq.kaha.Store;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.ReferenceStore;
-import org.apache.activemq.store.ReferenceStoreAdapter;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
-    private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
-   private static final String STORE_STATE = "store-state";
-   private static final String RECORD_REFERENCES = "record-references";
-    private MapContainer stateMap;
-	private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
-    private boolean storeValid;
-
-	
-    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
-    	throw new RuntimeException("Use createQueueReferenceStore instead");
-    }
-
-    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
-    	throw new RuntimeException("Use createTopicReferenceStore instead");
-    }
-    
-    @Override
-    public void start() throws Exception{
-        super.start();
-        Store store=getStore();
-        boolean empty=store.getMapContainerIds().isEmpty();
-        stateMap=store.getMapContainer("state",STORE_STATE);
-        stateMap.load();
-        if(!empty){
-            
-            AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
-            if(status!=null){
-                storeValid=status.get();
-            }
-           
-            if(storeValid){
-                if(stateMap.containsKey(RECORD_REFERENCES)){
-                    recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
-                }
-            }else {
-                /*
-                log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
-                Set<ContainerId> set = store.getListContainerIds();
-                for (ContainerId cid:set) {
-                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
-                        store.deleteListContainer(cid);
-                    }
-                }
-                set = store.getMapContainerIds();
-                for (ContainerId cid:set) {
-                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
-                        store.deleteMapContainer(cid);
-                    }
-                }
-                */
-                buildReferenceFileIdsInUse();
-            }
-            
-        }
-        stateMap.put(STORE_STATE,new AtomicBoolean());
-    }
-    
-    @Override
-    public void stop() throws Exception {
-        stateMap.put(RECORD_REFERENCES,recordReferences);
-        stateMap.put(STORE_STATE,new AtomicBoolean(true));
-        super.stop();        
-    }
-    
-    
-    public boolean isStoreValid() {
-        return storeValid;
-    }
-    
-
-	public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
-		ReferenceStore rc=(ReferenceStore)queues.get(destination);
-        if(rc==null){
-            rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
-            messageStores.put(destination,rc);
-//            if(transactionStore!=null){
-//                rc=transactionStore.proxy(rc);
-//            }
-            queues.put(destination,rc);
-        }
-        return rc;
-	}
-
-	public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
-		TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
-        if(rc==null){
-            Store store=getStore();
-            MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
-            MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
-            ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
-            ackContainer.setMarshaller(new TopicSubAckMarshaller());
-            rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
-            messageStores.put(destination,rc);
-//            if(transactionStore!=null){
-//                rc=transactionStore.proxy(rc);
-//            }
-            topics.put(destination,rc);
-        }
-        return rc;
-	}
-
-	public void buildReferenceFileIdsInUse() throws IOException {
-		
-        recordReferences = new HashMap<Integer,AtomicInteger>();
-		
-		Set<ActiveMQDestination> destinations = getDestinations();
-		for (ActiveMQDestination destination : destinations) {
-			if( destination.isQueue() ) {
-				KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
-				store.addReferenceFileIdsInUse();
-			} else {
-				KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
-				store.addReferenceFileIdsInUse();
-			}
-        }		
-	}
-    
-        
-    protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
-        Store store=getStore();
-        MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
-        container.setKeyMarshaller(new MessageIdMarshaller());
-        container.setValueMarshaller(new ReferenceRecordMarshaller());        
-        container.load();
-        return container;
-    }
-    
-    synchronized void addInterestInRecordFile(int recordNumber) {
-        Integer key = new Integer(recordNumber);
-        AtomicInteger rr = recordReferences.get(key);
-        if (rr == null) {
-            rr = new AtomicInteger();
-            recordReferences.put(key,rr);
-        }
-        rr.incrementAndGet();
-    }
-    
-    synchronized void removeInterestInRecordFile(int recordNumber) {
-        Integer key = new Integer(recordNumber);
-        AtomicInteger rr = recordReferences.get(key);
-        if (rr != null && rr.decrementAndGet() <= 0) {
-            recordReferences.remove(key);
-        }
-    }
-
-    /**
-     * @return
-     * @throws IOException
-     * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
-     */
-    public Set<Integer> getReferenceFileIdsInUse() throws IOException{
-        return recordReferences.keySet();
-    }
-
-    
-	
-}
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.MessageIdMarshaller;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
+    private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
+   private static final String STORE_STATE = "store-state";
+   private static final String RECORD_REFERENCES = "record-references";
+    private MapContainer stateMap;
+	private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
+    private boolean storeValid;
+
+	
+    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+    	throw new RuntimeException("Use createQueueReferenceStore instead");
+    }
+
+    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
+    	throw new RuntimeException("Use createTopicReferenceStore instead");
+    }
+    
+    @Override
+    public void start() throws Exception{
+        super.start();
+        Store store=getStore();
+        boolean empty=store.getMapContainerIds().isEmpty();
+        stateMap=store.getMapContainer("state",STORE_STATE);
+        stateMap.load();
+        if(!empty){
+            
+            AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
+            if(status!=null){
+                storeValid=status.get();
+            }
+           
+            if(storeValid){
+                if(stateMap.containsKey(RECORD_REFERENCES)){
+                    recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
+                }
+            }else {
+                /*
+                log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
+                Set<ContainerId> set = store.getListContainerIds();
+                for (ContainerId cid:set) {
+                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
+                        store.deleteListContainer(cid);
+                    }
+                }
+                set = store.getMapContainerIds();
+                for (ContainerId cid:set) {
+                    if (!cid.getDataContainerName().equals(STORE_STATE)) {
+                        store.deleteMapContainer(cid);
+                    }
+                }
+                */
+                buildReferenceFileIdsInUse();
+            }
+            
+        }
+        stateMap.put(STORE_STATE,new AtomicBoolean());
+    }
+    
+    @Override
+    public void stop() throws Exception {
+        stateMap.put(RECORD_REFERENCES,recordReferences);
+        stateMap.put(STORE_STATE,new AtomicBoolean(true));
+        super.stop();        
+    }
+    
+    
+    public boolean isStoreValid() {
+        return storeValid;
+    }
+    
+
+	public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
+		ReferenceStore rc=(ReferenceStore)queues.get(destination);
+        if(rc==null){
+            rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
+            messageStores.put(destination,rc);
+//            if(transactionStore!=null){
+//                rc=transactionStore.proxy(rc);
+//            }
+            queues.put(destination,rc);
+        }
+        return rc;
+	}
+
+	public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
+		TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
+        if(rc==null){
+            Store store=getStore();
+            MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
+            MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
+            ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+            ackContainer.setMarshaller(new TopicSubAckMarshaller());
+            rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
+            messageStores.put(destination,rc);
+//            if(transactionStore!=null){
+//                rc=transactionStore.proxy(rc);
+//            }
+            topics.put(destination,rc);
+        }
+        return rc;
+	}
+
+	public void buildReferenceFileIdsInUse() throws IOException {
+		
+        recordReferences = new HashMap<Integer,AtomicInteger>();
+		
+		Set<ActiveMQDestination> destinations = getDestinations();
+		for (ActiveMQDestination destination : destinations) {
+			if( destination.isQueue() ) {
+				KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
+				store.addReferenceFileIdsInUse();
+			} else {
+				KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
+				store.addReferenceFileIdsInUse();
+			}
+        }		
+	}
+    
+        
+    protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
+        Store store=getStore();
+        MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
+        container.setKeyMarshaller(new MessageIdMarshaller());
+        container.setValueMarshaller(new ReferenceRecordMarshaller());        
+        container.load();
+        return container;
+    }
+    
+    synchronized void addInterestInRecordFile(int recordNumber) {
+        Integer key = new Integer(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr == null) {
+            rr = new AtomicInteger();
+            recordReferences.put(key,rr);
+        }
+        rr.incrementAndGet();
+    }
+    
+    synchronized void removeInterestInRecordFile(int recordNumber) {
+        Integer key = new Integer(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr != null && rr.decrementAndGet() <= 0) {
+            recordReferences.remove(key);
+        }
+    }
+
+    /**
+     * @return
+     * @throws IOException
+     * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
+     */
+    public Set<Integer> getReferenceFileIdsInUse() throws IOException{
+        return recordReferences.keySet();
+    }
+
+    
+	
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message