activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r551902 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQMessageAudit.java util/BitArray.java util/BitArrayBin.java util/IdGenerator.java
Date Fri, 29 Jun 2007 14:07:47 GMT
Author: rajdavies
Date: Fri Jun 29 07:07:46 2007
New Revision: 551902

URL: http://svn.apache.org/viewvc?view=rev&rev=551902
Log:
Added dup detection capability

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java   (with
props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?view=auto&rev=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Fri Jun 29 07:07:46 2007
@@ -0,0 +1,130 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.LinkedHashMap;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.BitArrayBin;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * Provides basic audit functions for Messages
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ActiveMQMessageAudit{
+
+    private static final int DEFAULT_WINDOW_SIZE=1024;
+    private static final int MAXIMUM_PRODUCER_COUNT=128;
+    private int windowSize;
+    private LinkedHashMap<Object,BitArrayBin> map;
+
+    /**
+     * Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack = 128
+     */
+    public ActiveMQMessageAudit(){
+        this(DEFAULT_WINDOW_SIZE,MAXIMUM_PRODUCER_COUNT);
+    }
+
+    /**
+     * Construct a MessageAudit
+     * 
+     * @param windowSize range of ids to track
+     * @param maximumNumberOfProducersToTrack number of producers expected in the system
+     */
+    public ActiveMQMessageAudit(int windowSize,final int maximumNumberOfProducersToTrack){
+        this.windowSize=windowSize;
+        map=new LRUCache<Object,BitArrayBin>(maximumNumberOfProducersToTrack,maximumNumberOfProducersToTrack,0.75f,true);
+    }
+
+    /**
+     * Checks if this message has beeb seen before
+     * 
+     * @param message
+     * @return true if the message is a duplicate
+     * @throws JMSException
+     */
+    public boolean isDuplicateMessage(Message message) throws JMSException{
+        return isDuplicate(message.getJMSMessageID());
+    }
+
+    /**
+     * checks whether this messageId has been seen before and adds this messageId to the
list
+     * 
+     * @param id
+     * @return true if the message is a duplicate
+     */
+    public synchronized boolean isDuplicate(String id){
+        boolean answer=false;
+        String seed=IdGenerator.getSeedFromId(id);
+        if(seed!=null){
+            BitArrayBin bab=map.get(seed);
+            if(bab==null){
+                bab=new BitArrayBin(windowSize);
+                map.put(seed,bab);
+            }
+            long index=IdGenerator.getSequenceFromId(id);
+            if(index>=0){
+                answer=bab.setBit(index,true);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * Checks if this message has beeb seen before
+     * 
+     * @param message
+     * @return true if the message is a duplicate
+     */
+    public synchronized boolean isDuplicateMessageReference(final MessageReference message){
+        boolean answer=false;
+        MessageId id=message.getMessageId();
+        if(id!=null){
+            ProducerId pid=id.getProducerId();
+            if(pid!=null){
+                BitArrayBin bab=map.get(pid);
+                if(bab==null){
+                    bab=new BitArrayBin(windowSize);
+                    map.put(pid,bab);
+                }
+                answer=bab.setBit(id.getProducerSequenceId(),true);
+            }
+        }
+        return answer;
+    }
+    
+    /**
+     * uun mark this messager as being received
+     * @param message
+     */
+    public synchronized void rollbackMessageReference(final MessageReference message){
+        MessageId id=message.getMessageId();
+        if(id!=null){
+            ProducerId pid=id.getProducerId();
+            if(pid!=null){
+                BitArrayBin bab=map.get(pid);
+                if(bab!=null){
+                    bab.setBit(id.getProducerSequenceId(),false);
+                }
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java?view=auto&rev=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java Fri
Jun 29 07:07:46 2007
@@ -0,0 +1,152 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Simple BitArray to enable setting multiple boolean values efficently Used instead of BitSet
because BitSet does not
+ * allow for efficent serialization.
+ * Will store up to 64 boolean values
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArray {
+    static final int LONG_SIZE = 64;
+    static final int INT_SIZE = 32;
+    static final int SHORT_SIZE = 16;
+    static final int BYTE_SIZE = 8;
+    private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L,
+                                              0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L,
0x0000000000000040L, 0x0000000000000080L,
+                                              0x0000000000000100L, 0x0000000000000200L, 0x0000000000000400L,
0x0000000000000800L, 0x0000000000001000L,
+                                              0x0000000000002000L, 0x0000000000004000L, 0x0000000000008000L,
0x0000000000010000L, 0x0000000000020000L,
+                                              0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L,
0x0000000000200000L, 0x0000000000400000L,
+                                              0x0000000000800000L, 0x0000000001000000L, 0x0000000002000000L,
0x0000000004000000L, 0x0000000008000000L,
+                                              0x0000000010000000L, 0x0000000020000000L, 0x0000000040000000L,
0x0000000080000000L, 0x0000000100000000L,
+                                              0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L,
0x0000001000000000L, 0x0000002000000000L,
+                                              0x0000004000000000L, 0x0000008000000000L, 0x0000010000000000L,
0x0000020000000000L, 0x0000040000000000L,
+                                              0x0000080000000000L, 0x0000100000000000L, 0x0000200000000000L,
0x0000400000000000L, 0x0000800000000000L,
+                                              0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L,
0x0008000000000000L, 0x0010000000000000L,
+                                              0x0020000000000000L, 0x0040000000000000L, 0x0080000000000000L,
0x0100000000000000L, 0x0200000000000000L,
+                                              0x0400000000000000L, 0x0800000000000000L, 0x1000000000000000L,
0x2000000000000000L, 0x4000000000000000L,
+                                              0x8000000000000000L};
+    private long bits;
+    private int length;
+
+    /**
+     * @return the length of bits set
+     */
+    public int length() {
+        return length;
+    }
+
+    /**
+     * @return the long containing the bits
+     */
+    public long getBits() {
+        return bits;
+    }
+
+    /**
+     * set the boolean value at the index
+     *
+     * @param index
+     * @param flag
+     * @return the old value held at this index
+     */
+    public boolean set(int index, boolean flag) {
+        length = Math.max(length, index + 1);
+        boolean oldValue = (bits & BIT_VALUES[index]) != 0;
+        if (flag) {
+            bits |= BIT_VALUES[index];
+        }
+        else if (oldValue) {
+            bits &= ~(BIT_VALUES[index]);
+        }
+        return oldValue;
+    }
+
+    /**
+     * @param index
+     * @return the boolean value at this index
+     */
+    public boolean get(int index) {
+        return (bits & BIT_VALUES[index]) != 0;
+    }
+    
+    /**
+     * reset all the bit values to false
+     */
+    public void reset(){
+        bits = 0;
+    }
+    
+    /**
+     * reset all the bits to the value supplied
+     * @param bits
+     */
+    public void reset(long bits){
+        this.bits = bits;
+    }
+
+    /**
+     * write the bits to an output stream
+     *
+     * @param dataOut
+     * @throws IOException
+     */
+    public void writeToStream(DataOutput dataOut) throws IOException {
+        dataOut.writeByte(length);
+        if (length <= BYTE_SIZE) {
+            dataOut.writeByte((int) bits);
+        }
+        else if (length <= SHORT_SIZE) {
+            dataOut.writeShort((short) bits);
+        }
+        else if (length <= INT_SIZE) {
+            dataOut.writeInt((int) bits);
+        }
+        else {
+            dataOut.writeLong(bits);
+        }
+    }
+
+    /**
+     * read the bits from an input stream
+     *
+     * @param dataIn
+     * @throws IOException
+     */
+    public void readFromStream(DataInput dataIn) throws IOException {
+        length = dataIn.readByte();
+        if (length <= BYTE_SIZE) {
+            bits = dataIn.readByte();
+        }
+        else if (length <= SHORT_SIZE) {
+            bits = dataIn.readShort();
+        }
+        else if (length <= INT_SIZE) {
+            bits=dataIn.readInt();
+        }
+        else {
+            bits = dataIn.readLong();
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?view=auto&rev=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Fri
Jun 29 07:07:46 2007
@@ -0,0 +1,139 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+import java.util.LinkedList;
+
+/**
+ * Holder for many bitArrays - used for message audit
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArrayBin{
+
+    private LinkedList<BitArray> list;
+    private int maxNumberOfArrays;
+    private int firstIndex=-1;
+    private int firstBin=-1;
+
+    /**
+     * Create a BitArrayBin to a certain window size (number of messages to keep)
+     * 
+     * @param windowSize
+     */
+    public BitArrayBin(int windowSize){
+        maxNumberOfArrays=((windowSize+1)/BitArray.LONG_SIZE)+1;
+        maxNumberOfArrays=Math.max(maxNumberOfArrays,1);
+        list=new LinkedList<BitArray>();
+        for(int i=0;i<maxNumberOfArrays;i++){
+            list.add(new BitArray());
+        }
+    }
+
+    /**
+     * Set a bit
+     * 
+     * @param index
+     * @param value
+     * @return true if set
+     */
+    public boolean setBit(long index,boolean value){
+        boolean answer=true;
+        BitArray ba=getBitArray(index);
+        if(ba!=null){
+            int offset=getOffset(index);
+            if(offset>=0){
+                answer=ba.set(offset,value);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * Get the boolean value at the index
+     * 
+     * @param index
+     * @return true/false
+     */
+    public boolean getBit(long index){
+        boolean answer=index>=firstIndex;
+        BitArray ba=getBitArray(index);
+        if(ba!=null){
+            int offset=getOffset(index);
+            if(offset>=0){
+                answer=ba.get(offset);
+                return answer;
+            }
+        }else{
+            // gone passed range for previous bins so assume set
+            answer=true;
+        }
+        return answer;
+    }
+
+    /**
+     * Get the BitArray for the index
+     * 
+     * @param index
+     * @return BitArray
+     */
+    private BitArray getBitArray(long index){
+        int bin=getBin(index);
+        BitArray answer=null;
+        if(bin>=0){
+            if(firstIndex<0){
+                firstIndex=0;
+            }
+            if(bin>=list.size()){
+                list.removeFirst();
+                firstIndex+=BitArray.LONG_SIZE;
+                list.add(new BitArray());
+                bin=list.size()-1;
+            }
+            answer=list.get(bin);
+        }
+        return answer;
+    }
+
+    /**
+     * Get the index of the bin from the total index
+     * 
+     * @param index
+     * @return the index of the bin
+     */
+    private int getBin(long index){
+        int answer=0;
+        if(firstBin<0){
+            firstBin=0;
+        }else if(firstIndex>=0){
+            answer=(int)((index-firstIndex)/BitArray.LONG_SIZE);
+        }
+        return answer;
+    }
+
+    /**
+     * Get the offset into a bin from the total index
+     * 
+     * @param index
+     * @return the relative offset into a bin
+     */
+    private int getOffset(long index){
+        int answer=0;
+        if(firstIndex>=0){
+            answer=(int)((index-firstIndex)-(BitArray.LONG_SIZE*getBin(index)));
+        }
+        return answer;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java?view=diff&rev=551902&r1=551901&r2=551902
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java Fri
Jun 29 07:07:46 2007
@@ -108,5 +108,65 @@
         result = result.replace('.', '-');
         return result;
     }
+    
+    /**
+     * From a generated id - return the seed (i.e. minus the count)
+     *
+     * @param id the generated identifer
+     * @return the seed
+     */
+    public static String getSeedFromId(String id) {
+        String result = id;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+            if (index > 0 && (index + 1) < id.length()) {
+                result = id.substring(0, index + 1);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * From a generated id - return the generator count
+     *
+     * @param id
+     * @return the count
+     */
+    public static long getSequenceFromId(String id) {
+        long result = -1;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+
+            if (index > 0 && (index + 1) < id.length()) {
+                String numStr = id.substring(index + 1, id.length());
+                result = Long.parseLong(numStr);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Does a proper compare on the ids
+     *
+     * @param id1
+     * @param id2
+     * @return 0 if equal else a positive if id1 is > id2 ...
+     */
+
+    public static int compare(String id1, String id2) {
+        int result = -1;
+        String seed1 = IdGenerator.getSeedFromId(id1);
+        String seed2 = IdGenerator.getSeedFromId(id2);
+        if (seed1 != null && seed2 != null) {
+            result = seed1.compareTo(seed2);
+            if (result == 0) {
+                long count1 = IdGenerator.getSequenceFromId(id1);
+                long count2 = IdGenerator.getSequenceFromId(id2);
+                result = (int) (count1 - count2);
+            }
+        }
+        return result;
+
+    }
 
 }



Mime
View raw message