Author: rajdavies
Date: Fri Jun 29 07:07:46 2007
New Revision: 551902
URL: http://svn.apache.org/viewvc?view=rev&rev=551902
Log:
Added dup detection capability
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java (with
props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?view=auto&rev=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Fri Jun 29 07:07:46 2007
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.LinkedHashMap;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.BitArrayBin;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * Provides basic audit functions for Messages
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ActiveMQMessageAudit{
+
+ private static final int DEFAULT_WINDOW_SIZE=1024;
+ private static final int MAXIMUM_PRODUCER_COUNT=128;
+ private int windowSize;
+ private LinkedHashMap<Object,BitArrayBin> map;
+
+ /**
+ * Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack = 128
+ */
+ public ActiveMQMessageAudit(){
+ this(DEFAULT_WINDOW_SIZE,MAXIMUM_PRODUCER_COUNT);
+ }
+
+ /**
+ * Construct a MessageAudit
+ *
+ * @param windowSize range of ids to track
+ * @param maximumNumberOfProducersToTrack number of producers expected in the system
+ */
+ public ActiveMQMessageAudit(int windowSize,final int maximumNumberOfProducersToTrack){
+ this.windowSize=windowSize;
+ map=new LRUCache<Object,BitArrayBin>(maximumNumberOfProducersToTrack,maximumNumberOfProducersToTrack,0.75f,true);
+ }
+
+ /**
+ * Checks if this message has beeb seen before
+ *
+ * @param message
+ * @return true if the message is a duplicate
+ * @throws JMSException
+ */
+ public boolean isDuplicateMessage(Message message) throws JMSException{
+ return isDuplicate(message.getJMSMessageID());
+ }
+
+ /**
+ * checks whether this messageId has been seen before and adds this messageId to the
list
+ *
+ * @param id
+ * @return true if the message is a duplicate
+ */
+ public synchronized boolean isDuplicate(String id){
+ boolean answer=false;
+ String seed=IdGenerator.getSeedFromId(id);
+ if(seed!=null){
+ BitArrayBin bab=map.get(seed);
+ if(bab==null){
+ bab=new BitArrayBin(windowSize);
+ map.put(seed,bab);
+ }
+ long index=IdGenerator.getSequenceFromId(id);
+ if(index>=0){
+ answer=bab.setBit(index,true);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Checks if this message has beeb seen before
+ *
+ * @param message
+ * @return true if the message is a duplicate
+ */
+ public synchronized boolean isDuplicateMessageReference(final MessageReference message){
+ boolean answer=false;
+ MessageId id=message.getMessageId();
+ if(id!=null){
+ ProducerId pid=id.getProducerId();
+ if(pid!=null){
+ BitArrayBin bab=map.get(pid);
+ if(bab==null){
+ bab=new BitArrayBin(windowSize);
+ map.put(pid,bab);
+ }
+ answer=bab.setBit(id.getProducerSequenceId(),true);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * uun mark this messager as being received
+ * @param message
+ */
+ public synchronized void rollbackMessageReference(final MessageReference message){
+ MessageId id=message.getMessageId();
+ if(id!=null){
+ ProducerId pid=id.getProducerId();
+ if(pid!=null){
+ BitArrayBin bab=map.get(pid);
+ if(bab!=null){
+ bab.setBit(id.getProducerSequenceId(),false);
+ }
+ }
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java?view=auto&rev=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java Fri
Jun 29 07:07:46 2007
@@ -0,0 +1,152 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Simple BitArray to enable setting multiple boolean values efficently Used instead of BitSet
because BitSet does not
+ * allow for efficent serialization.
+ * Will store up to 64 boolean values
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArray {
+ static final int LONG_SIZE = 64;
+ static final int INT_SIZE = 32;
+ static final int SHORT_SIZE = 16;
+ static final int BYTE_SIZE = 8;
+ private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L,
+ 0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L,
0x0000000000000040L, 0x0000000000000080L,
+ 0x0000000000000100L, 0x0000000000000200L, 0x0000000000000400L,
0x0000000000000800L, 0x0000000000001000L,
+ 0x0000000000002000L, 0x0000000000004000L, 0x0000000000008000L,
0x0000000000010000L, 0x0000000000020000L,
+ 0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L,
0x0000000000200000L, 0x0000000000400000L,
+ 0x0000000000800000L, 0x0000000001000000L, 0x0000000002000000L,
0x0000000004000000L, 0x0000000008000000L,
+ 0x0000000010000000L, 0x0000000020000000L, 0x0000000040000000L,
0x0000000080000000L, 0x0000000100000000L,
+ 0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L,
0x0000001000000000L, 0x0000002000000000L,
+ 0x0000004000000000L, 0x0000008000000000L, 0x0000010000000000L,
0x0000020000000000L, 0x0000040000000000L,
+ 0x0000080000000000L, 0x0000100000000000L, 0x0000200000000000L,
0x0000400000000000L, 0x0000800000000000L,
+ 0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L,
0x0008000000000000L, 0x0010000000000000L,
+ 0x0020000000000000L, 0x0040000000000000L, 0x0080000000000000L,
0x0100000000000000L, 0x0200000000000000L,
+ 0x0400000000000000L, 0x0800000000000000L, 0x1000000000000000L,
0x2000000000000000L, 0x4000000000000000L,
+ 0x8000000000000000L};
+ private long bits;
+ private int length;
+
+ /**
+ * @return the length of bits set
+ */
+ public int length() {
+ return length;
+ }
+
+ /**
+ * @return the long containing the bits
+ */
+ public long getBits() {
+ return bits;
+ }
+
+ /**
+ * set the boolean value at the index
+ *
+ * @param index
+ * @param flag
+ * @return the old value held at this index
+ */
+ public boolean set(int index, boolean flag) {
+ length = Math.max(length, index + 1);
+ boolean oldValue = (bits & BIT_VALUES[index]) != 0;
+ if (flag) {
+ bits |= BIT_VALUES[index];
+ }
+ else if (oldValue) {
+ bits &= ~(BIT_VALUES[index]);
+ }
+ return oldValue;
+ }
+
+ /**
+ * @param index
+ * @return the boolean value at this index
+ */
+ public boolean get(int index) {
+ return (bits & BIT_VALUES[index]) != 0;
+ }
+
+ /**
+ * reset all the bit values to false
+ */
+ public void reset(){
+ bits = 0;
+ }
+
+ /**
+ * reset all the bits to the value supplied
+ * @param bits
+ */
+ public void reset(long bits){
+ this.bits = bits;
+ }
+
+ /**
+ * write the bits to an output stream
+ *
+ * @param dataOut
+ * @throws IOException
+ */
+ public void writeToStream(DataOutput dataOut) throws IOException {
+ dataOut.writeByte(length);
+ if (length <= BYTE_SIZE) {
+ dataOut.writeByte((int) bits);
+ }
+ else if (length <= SHORT_SIZE) {
+ dataOut.writeShort((short) bits);
+ }
+ else if (length <= INT_SIZE) {
+ dataOut.writeInt((int) bits);
+ }
+ else {
+ dataOut.writeLong(bits);
+ }
+ }
+
+ /**
+ * read the bits from an input stream
+ *
+ * @param dataIn
+ * @throws IOException
+ */
+ public void readFromStream(DataInput dataIn) throws IOException {
+ length = dataIn.readByte();
+ if (length <= BYTE_SIZE) {
+ bits = dataIn.readByte();
+ }
+ else if (length <= SHORT_SIZE) {
+ bits = dataIn.readShort();
+ }
+ else if (length <= INT_SIZE) {
+ bits=dataIn.readInt();
+ }
+ else {
+ bits = dataIn.readLong();
+ }
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?view=auto&rev=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Fri
Jun 29 07:07:46 2007
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+import java.util.LinkedList;
+
+/**
+ * Holder for many bitArrays - used for message audit
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArrayBin{
+
+ private LinkedList<BitArray> list;
+ private int maxNumberOfArrays;
+ private int firstIndex=-1;
+ private int firstBin=-1;
+
+ /**
+ * Create a BitArrayBin to a certain window size (number of messages to keep)
+ *
+ * @param windowSize
+ */
+ public BitArrayBin(int windowSize){
+ maxNumberOfArrays=((windowSize+1)/BitArray.LONG_SIZE)+1;
+ maxNumberOfArrays=Math.max(maxNumberOfArrays,1);
+ list=new LinkedList<BitArray>();
+ for(int i=0;i<maxNumberOfArrays;i++){
+ list.add(new BitArray());
+ }
+ }
+
+ /**
+ * Set a bit
+ *
+ * @param index
+ * @param value
+ * @return true if set
+ */
+ public boolean setBit(long index,boolean value){
+ boolean answer=true;
+ BitArray ba=getBitArray(index);
+ if(ba!=null){
+ int offset=getOffset(index);
+ if(offset>=0){
+ answer=ba.set(offset,value);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Get the boolean value at the index
+ *
+ * @param index
+ * @return true/false
+ */
+ public boolean getBit(long index){
+ boolean answer=index>=firstIndex;
+ BitArray ba=getBitArray(index);
+ if(ba!=null){
+ int offset=getOffset(index);
+ if(offset>=0){
+ answer=ba.get(offset);
+ return answer;
+ }
+ }else{
+ // gone passed range for previous bins so assume set
+ answer=true;
+ }
+ return answer;
+ }
+
+ /**
+ * Get the BitArray for the index
+ *
+ * @param index
+ * @return BitArray
+ */
+ private BitArray getBitArray(long index){
+ int bin=getBin(index);
+ BitArray answer=null;
+ if(bin>=0){
+ if(firstIndex<0){
+ firstIndex=0;
+ }
+ if(bin>=list.size()){
+ list.removeFirst();
+ firstIndex+=BitArray.LONG_SIZE;
+ list.add(new BitArray());
+ bin=list.size()-1;
+ }
+ answer=list.get(bin);
+ }
+ return answer;
+ }
+
+ /**
+ * Get the index of the bin from the total index
+ *
+ * @param index
+ * @return the index of the bin
+ */
+ private int getBin(long index){
+ int answer=0;
+ if(firstBin<0){
+ firstBin=0;
+ }else if(firstIndex>=0){
+ answer=(int)((index-firstIndex)/BitArray.LONG_SIZE);
+ }
+ return answer;
+ }
+
+ /**
+ * Get the offset into a bin from the total index
+ *
+ * @param index
+ * @return the relative offset into a bin
+ */
+ private int getOffset(long index){
+ int answer=0;
+ if(firstIndex>=0){
+ answer=(int)((index-firstIndex)-(BitArray.LONG_SIZE*getBin(index)));
+ }
+ return answer;
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java?view=diff&rev=551902&r1=551901&r2=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java Fri
Jun 29 07:07:46 2007
@@ -108,5 +108,65 @@
result = result.replace('.', '-');
return result;
}
+
+ /**
+ * From a generated id - return the seed (i.e. minus the count)
+ *
+ * @param id the generated identifer
+ * @return the seed
+ */
+ public static String getSeedFromId(String id) {
+ String result = id;
+ if (id != null) {
+ int index = id.lastIndexOf(':');
+ if (index > 0 && (index + 1) < id.length()) {
+ result = id.substring(0, index + 1);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * From a generated id - return the generator count
+ *
+ * @param id
+ * @return the count
+ */
+ public static long getSequenceFromId(String id) {
+ long result = -1;
+ if (id != null) {
+ int index = id.lastIndexOf(':');
+
+ if (index > 0 && (index + 1) < id.length()) {
+ String numStr = id.substring(index + 1, id.length());
+ result = Long.parseLong(numStr);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Does a proper compare on the ids
+ *
+ * @param id1
+ * @param id2
+ * @return 0 if equal else a positive if id1 is > id2 ...
+ */
+
+ public static int compare(String id1, String id2) {
+ int result = -1;
+ String seed1 = IdGenerator.getSeedFromId(id1);
+ String seed2 = IdGenerator.getSeedFromId(id2);
+ if (seed1 != null && seed2 != null) {
+ result = seed1.compareTo(seed2);
+ if (result == 0) {
+ long count1 = IdGenerator.getSequenceFromId(id1);
+ long count2 = IdGenerator.getSequenceFromId(id2);
+ result = (int) (count1 - count2);
+ }
+ }
+ return result;
+
+ }
}
|