activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1468534 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Util/ActiveMQMessageAudit.cs main/csharp/Util/LRUCache.cs test/csharp/Util/ActiveMQMessageAuditTest.cs
Date Tue, 16 Apr 2013 17:59:26 GMT
Author: tabish
Date: Tue Apr 16 17:59:26 2013
New Revision: 1468534

URL: http://svn.apache.org/r1468534
Log:
Implement a Message Audit and test along with some updates toe LRUCache

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ActiveMQMessageAudit.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/ActiveMQMessageAuditTest.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/LRUCache.cs

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ActiveMQMessageAudit.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ActiveMQMessageAudit.cs?rev=1468534&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ActiveMQMessageAudit.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ActiveMQMessageAudit.cs
Tue Apr 16 17:59:26 2013
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Util
+{
+	public class ActiveMQMessageAudit
+	{
+	    public const int DEFAULT_WINDOW_SIZE = 2048;
+	    public const int MAXIMUM_PRODUCER_COUNT = 64;
+
+        private readonly object mutex = new object();
+
+	    private int auditDepth;
+	    private int maximumNumberOfProducersToTrack;
+		private LRUCache<Object, BitArrayBin> map;
+
+		public int AuditDepth
+		{
+			get { return this.auditDepth; }
+			set { this.auditDepth = value; }
+		}
+
+		public int MaximumNumberOfProducersToTrack
+		{
+			get { return this.maximumNumberOfProducersToTrack; }
+			set 
+			{ 
+	            lock(this.mutex)
+				{
+	        		if (value < this.maximumNumberOfProducersToTrack)
+					{
+	            		LRUCache<Object, BitArrayBin> newMap = new LRUCache<Object, BitArrayBin>(value);
+		            	
+		             	// As putAll will access the entries in the right order,
+		             	// this shouldn't result in wrong cache entries being removed
+		            	newMap.PutAll(this.map);
+		            	this.map.Clear();
+		            	this.map.PutAll(newMap);
+		        	}
+	        		this.map.MaxCacheSize = value;
+	        		this.maximumNumberOfProducersToTrack = value;
+				}
+			}
+		}
+
+	    public ActiveMQMessageAudit()
+		{
+	        this.auditDepth = DEFAULT_WINDOW_SIZE;
+	        this.maximumNumberOfProducersToTrack = MAXIMUM_PRODUCER_COUNT;
+	        this.map = new LRUCache<Object, BitArrayBin>(MAXIMUM_PRODUCER_COUNT);
+	    }
+
+	    public ActiveMQMessageAudit(int auditDepth, int maximumNumberOfProducersToTrack)
+		{
+	        this.auditDepth = auditDepth;
+	        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
+	        this.map = new LRUCache<Object, BitArrayBin>(maximumNumberOfProducersToTrack);
+	    }
+
+	    public bool IsDuplicate(MessageId id) 
+		{
+	        bool answer = false;
+
+	        if (id != null) 
+			{
+	            ProducerId pid = id.ProducerId;
+	            if (pid != null)
+				{
+            		lock(this.mutex)
+					{
+		                BitArrayBin bab = null;
+		                if (!map.TryGetValue(pid, out bab)) 
+						{
+		                    bab = new BitArrayBin(auditDepth);
+		                    map[pid] = bab;
+		                }
+		                answer = bab.SetBit(id.ProducerSequenceId, true);
+					}
+	            }
+	        }
+	        return answer;
+	    }
+
+	    public void Rollback(MessageId id) 
+		{
+	        if (id != null)
+			{
+	            ProducerId pid = id.ProducerId;
+	            if (pid != null)
+				{
+            		lock(this.mutex)
+					{
+		                BitArrayBin bab = null;
+		                if (map.TryGetValue(pid, out bab)) 
+						{
+		                    bab.SetBit(id.ProducerSequenceId, false);
+		                }
+					}
+	            }
+	        }
+	    }
+
+	    public bool IsInOrder(MessageId id) 
+		{
+	        bool answer = false;
+
+	        if (id != null) 
+			{
+	            ProducerId pid = id.ProducerId;
+	            if (pid != null) 
+				{
+            		lock(this.mutex)
+					{
+		                BitArrayBin bab = null;
+		                if (!map.TryGetValue(pid, out bab)) 
+						{
+		                    bab = new BitArrayBin(auditDepth);
+		                    map[pid] = bab;
+		                }
+		                answer = bab.IsInOrder(id.ProducerSequenceId);
+					}
+	            }
+	        }
+	        return answer;
+	    }
+
+	    public long GetLastSeqId(ProducerId id) 
+		{
+	        long result = -1;
+            BitArrayBin bab = null;
+
+    		lock(this.mutex)
+			{
+				if (map.TryGetValue(id, out bab)) 
+				{
+		            result = bab.GetLastSetIndex();
+		        }
+			}
+	        return result;
+	    }
+
+	    public void Clear() 
+		{
+    		lock(this.mutex)
+			{
+	        	map.Clear();
+			}
+	    }
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ActiveMQMessageAudit.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/LRUCache.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/LRUCache.cs?rev=1468534&r1=1468533&r2=1468534&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/LRUCache.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/LRUCache.cs Tue
Apr 16 17:59:26 2013
@@ -124,6 +124,19 @@ namespace Apache.NMS.ActiveMQ.Util
 		{
 			return this.dictionary.Count > this.maxCacheSize;
 		}
+
+		public void PutAll(LRUCache<TKey, TValue> source)
+		{
+			if (Object.Equals(source, this))
+			{
+				return;
+			}
+
+			foreach(KeyValuePair<TKey, TValue> entry in source.entries)
+			{
+				this.Add(entry.Key, entry.Value);
+			}
+		}
 	}
 
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/ActiveMQMessageAuditTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/ActiveMQMessageAuditTest.cs?rev=1468534&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/ActiveMQMessageAuditTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/ActiveMQMessageAuditTest.cs
Tue Apr 16 17:59:26 2013
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+	public class ActiveMQMessageAuditTest
+	{
+		[Test]
+		public void TestIsDuplicateMessageId()
+		{
+		    int count = 10000;
+		    ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+		    List<MessageId> list = new List<MessageId>();
+
+		    ProducerId pid = new ProducerId();
+		    pid.ConnectionId = "test";
+		    pid.SessionId = 0;
+		    pid.Value = 1;
+
+		    for (int i = 0; i < count; i++) 
+			{
+		        MessageId id = new MessageId();
+		        id.ProducerId = pid;
+		        id.ProducerSequenceId = i;
+		        list.Add(id);
+		        Assert.IsFalse(audit.IsDuplicate(id));
+		    }
+
+		    int index = list.Count -1 -audit.AuditDepth;
+		    for (; index < list.Count; index++) 
+			{
+		        MessageId id = list[index];
+		        Assert.IsTrue(audit.IsDuplicate(id), "duplicate msg:" + id.ToString());
+		    }
+		}
+
+		[Test]
+		public void TestIsInOrderMessageId()
+		{
+		    int count = 10000;
+		    ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+		    List<MessageId> list = new List<MessageId>();
+
+		    ProducerId pid = new ProducerId();
+		    pid.ConnectionId = "test";
+		    pid.SessionId = 0;
+		    pid.Value = 1;
+
+		    for (int i = 0; i < count; i++)
+			{
+		        MessageId id = new MessageId();
+		        id.ProducerId = pid;
+		        id.ProducerSequenceId = i;
+
+		        if (i == 0) 
+				{
+		            Assert.IsFalse(audit.IsDuplicate(id));
+		            Assert.IsTrue(audit.IsInOrder(id));
+		        }
+		        if (i > 1 && i % 2 != 0) 
+				{
+		            list.Add(id);
+		        }
+		    }
+
+		    for (int i = 0; i < list.Count; i++)
+			{
+		        MessageId mid = list[i];
+		        Assert.IsFalse(audit.IsInOrder(mid), "Out of order msg: " + mid.ToString());
+		        Assert.IsFalse(audit.IsDuplicate(mid));
+		    }
+		}
+
+		[Test]
+		public void TestRollbackMessageId()
+		{
+		    int count = 10000;
+		    ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+		    List<MessageId> list = new List<MessageId>();
+
+		    ProducerId pid = new ProducerId();
+		    pid.ConnectionId = "test";
+		    pid.SessionId = 0;
+		    pid.Value = 1;
+
+		    for (int i = 0; i < count; i++) 
+			{
+		        MessageId id = new MessageId();
+		        id.ProducerId = pid;
+		        id.ProducerSequenceId = i;
+		        list.Add(id);
+		        Assert.IsFalse(audit.IsDuplicate(id));
+		    }
+
+		    int index = list.Count -1 -audit.AuditDepth;
+		    for (; index < list.Count; index++) 
+			{
+		        MessageId id = list[index];
+		        Assert.IsTrue(audit.IsDuplicate(id), "duplicate msg:" + id.ToString());
+		        audit.Rollback(id);
+		        Assert.IsFalse(audit.IsDuplicate(id), "erronious duplicate msg:" + id.ToString());
+		    }
+		}
+
+		[Test]
+		public void TestGetLastSeqId()
+		{
+		    int count = 10000;
+		    ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+		    List<MessageId> list = new List<MessageId>();
+
+		    ProducerId pid = new ProducerId();
+		    pid.ConnectionId = "test";
+		    pid.SessionId = 0;
+		    pid.Value = 1;
+
+			MessageId id = new MessageId();
+		    id.ProducerId = pid;
+
+		    for (int i = 0; i < count; i++)
+			{
+		        id.ProducerSequenceId = i;
+		        list.Add(id);
+		        Assert.IsFalse(audit.IsDuplicate(id));
+		        Assert.AreEqual(i, audit.GetLastSeqId(pid));
+		    }
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/ActiveMQMessageAuditTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message