qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r886998 [4/14] - in /qpid/trunk: ./ qpid/dotnet/Qpid.Buffer.Tests/ qpid/dotnet/Qpid.Buffer.Tests/Properties/ qpid/dotnet/Qpid.Buffer/ qpid/dotnet/Qpid.Client.Tests/BrokerDetails/ qpid/dotnet/Qpid.Client.Tests/Channel/ qpid/dotnet/Qpid.Clien...
Date Thu, 03 Dec 2009 23:55:56 GMT
Modified: qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs Thu Dec  3 23:55:48 2009
@@ -1,98 +1,98 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using System.Threading;
-using Apache.Qpid.Collections;
-using Apache.Qpid.Common;
-
-namespace Apache.Qpid.Client.Util
-{
-   internal delegate void ThresholdMethod(int currentCount);
-
-   /// <summary>
-   /// Basic bounded queue used to implement prefetching.
-   /// Notice we do the callbacks here asynchronously to
-   /// avoid adding more complexity to the channel impl.
-   /// </summary>
-   internal class FlowControlQueue
-   {
-      private BlockingQueue _queue = new LinkedBlockingQueue();
-      private int _itemCount;
-      private int _lowerBound;
-      private int _upperBound;
-      private ThresholdMethod _underThreshold;
-      private ThresholdMethod _overThreshold;
-
-      public FlowControlQueue(
-         int lowerBound, 
-         int upperBound,
-         ThresholdMethod underThreshold,
-         ThresholdMethod overThreshold
-         )
-      {
-         _lowerBound = lowerBound;
-         _upperBound = upperBound;
-         _underThreshold = underThreshold;
-         _overThreshold = overThreshold;
-      }
-
-      public void Enqueue(object item)
-      {
-         _queue.EnqueueBlocking(item);
-         int count = Interlocked.Increment(ref _itemCount);
-         if ( _overThreshold != null )
-         {
-            if ( count == _upperBound )
-            {
-               _overThreshold.BeginInvoke(
-                  count, new AsyncCallback(OnAsyncCallEnd), 
-                  _overThreshold
-                  );
-            }
-         }
-      }
-
-      public object Dequeue()
-      {
-         object item = _queue.DequeueBlocking();
-         int count = Interlocked.Decrement(ref _itemCount);
-         if ( _underThreshold != null )
-         {
-            if ( count == _lowerBound )
-            {
-               _underThreshold.BeginInvoke(
-                  count, new AsyncCallback(OnAsyncCallEnd),
-                  _underThreshold
-                  );
-            }
-         }
-         return item;
-      }
-
-      private void OnAsyncCallEnd(IAsyncResult res)
-      {
-         ThresholdMethod method = (ThresholdMethod)res.AsyncState;
-         method.EndInvoke(res);
-      }
-   }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Common;
+
+namespace Apache.Qpid.Client.Util
+{
+   internal delegate void ThresholdMethod(int currentCount);
+
+   /// <summary>
+   /// Basic bounded queue used to implement prefetching.
+   /// Notice we do the callbacks here asynchronously to
+   /// avoid adding more complexity to the channel impl.
+   /// </summary>
+   internal class FlowControlQueue
+   {
+      private BlockingQueue _queue = new LinkedBlockingQueue();
+      private int _itemCount;
+      private int _lowerBound;
+      private int _upperBound;
+      private ThresholdMethod _underThreshold;
+      private ThresholdMethod _overThreshold;
+
+      public FlowControlQueue(
+         int lowerBound, 
+         int upperBound,
+         ThresholdMethod underThreshold,
+         ThresholdMethod overThreshold
+         )
+      {
+         _lowerBound = lowerBound;
+         _upperBound = upperBound;
+         _underThreshold = underThreshold;
+         _overThreshold = overThreshold;
+      }
+
+      public void Enqueue(object item)
+      {
+         _queue.EnqueueBlocking(item);
+         int count = Interlocked.Increment(ref _itemCount);
+         if ( _overThreshold != null )
+         {
+            if ( count == _upperBound )
+            {
+               _overThreshold.BeginInvoke(
+                  count, new AsyncCallback(OnAsyncCallEnd), 
+                  _overThreshold
+                  );
+            }
+         }
+      }
+
+      public object Dequeue()
+      {
+         object item = _queue.DequeueBlocking();
+         int count = Interlocked.Decrement(ref _itemCount);
+         if ( _underThreshold != null )
+         {
+            if ( count == _lowerBound )
+            {
+               _underThreshold.BeginInvoke(
+                  count, new AsyncCallback(OnAsyncCallEnd),
+                  _underThreshold
+                  );
+            }
+         }
+         return item;
+      }
+
+      private void OnAsyncCallEnd(IAsyncResult res)
+      {
+         ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+         method.EndInvoke(res);
+      }
+   }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs Thu Dec  3 23:55:48 2009
@@ -1,85 +1,85 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using System.Threading;
-using NUnit.Framework;
-using Apache.Qpid.Collections;
-
-namespace Apache.Qpid.Collections.Tests
-{
-    [TestFixture]
-    public class TestConsumerProducerQueue
-    {
-       private ConsumerProducerQueue _queue;
-
-       [SetUp]
-       public void SetUp()
-       {
-          _queue = new ConsumerProducerQueue();
-       }
-
-       [Test]
-       public void CanDequeueWithInifiniteWait()
-       {
-          Thread producer = new Thread(new ThreadStart(ProduceFive));
-          producer.Start();
-          for ( int i = 0; i < 5; i++ )
-          {
-             object item = _queue.Dequeue();
-             Assert.IsNotNull(item);
-          }
-       }
-
-       [Test]
-       public void ReturnsNullOnDequeueTimeout()
-       {
-          // queue is empty
-          Assert.IsNull(_queue.Dequeue(500));
-       }
-
-       [Test]
-       public void DequeueTillEmpty()
-       {
-          _queue.Enqueue(1);
-          _queue.Enqueue(2);
-          _queue.Enqueue(3);
-          Assert.AreEqual(1, _queue.Dequeue());
-          Assert.AreEqual(2, _queue.Dequeue());
-          Assert.AreEqual(3, _queue.Dequeue());
-          // no messages in queue, will timeout
-          Assert.IsNull(_queue.Dequeue(500));
-       }
-
-
-       private void ProduceFive()
-       {
-          Thread.Sleep(1000);
-          _queue.Enqueue("test item 1");
-          _queue.Enqueue("test item 2");
-          _queue.Enqueue("test item 3");
-          Thread.Sleep(0);
-          _queue.Enqueue("test item 4");
-          _queue.Enqueue("test item 5");
-       }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using NUnit.Framework;
+using Apache.Qpid.Collections;
+
+namespace Apache.Qpid.Collections.Tests
+{
+    [TestFixture]
+    public class TestConsumerProducerQueue
+    {
+       private ConsumerProducerQueue _queue;
+
+       [SetUp]
+       public void SetUp()
+       {
+          _queue = new ConsumerProducerQueue();
+       }
+
+       [Test]
+       public void CanDequeueWithInifiniteWait()
+       {
+          Thread producer = new Thread(new ThreadStart(ProduceFive));
+          producer.Start();
+          for ( int i = 0; i < 5; i++ )
+          {
+             object item = _queue.Dequeue();
+             Assert.IsNotNull(item);
+          }
+       }
+
+       [Test]
+       public void ReturnsNullOnDequeueTimeout()
+       {
+          // queue is empty
+          Assert.IsNull(_queue.Dequeue(500));
+       }
+
+       [Test]
+       public void DequeueTillEmpty()
+       {
+          _queue.Enqueue(1);
+          _queue.Enqueue(2);
+          _queue.Enqueue(3);
+          Assert.AreEqual(1, _queue.Dequeue());
+          Assert.AreEqual(2, _queue.Dequeue());
+          Assert.AreEqual(3, _queue.Dequeue());
+          // no messages in queue, will timeout
+          Assert.IsNull(_queue.Dequeue(500));
+       }
+
+
+       private void ProduceFive()
+       {
+          Thread.Sleep(1000);
+          _queue.Enqueue("test item 1");
+          _queue.Enqueue("test item 2");
+          _queue.Enqueue("test item 3");
+          Thread.Sleep(0);
+          _queue.Enqueue("test item 4");
+          _queue.Enqueue("test item 5");
+       }
+    }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs Thu Dec  3 23:55:48 2009
@@ -1,270 +1,270 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using NUnit.Framework;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Framing.Tests
-{
-    [TestFixture]
-    public class TestAMQType
-    {
-
-       #region LONG_STRING tests
-       [Test]
-       public void LONG_STRING_ReadWrite()
-       {
-          AMQType type = AMQType.LONG_STRING;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const string VALUE = "simple string 1";
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       #endregion // LONG_STRING tests
-
-       #region UINT32 tests
-       [Test]
-       public void UINT32_CanGetEncodingSize()
-       {
-          AMQType type = AMQType.UINT32;
-          Assert.AreEqual(4, type.GetEncodingSize(1234443));
-       }
-
-       [Test]
-       public void UINT32_ToNativeValue()
-       {
-          AMQType type = AMQType.UINT32;
-          Assert.AreEqual(1, type.ToNativeValue(1));
-          Assert.AreEqual(1, type.ToNativeValue((short)1));
-          Assert.AreEqual(1, type.ToNativeValue((byte)1));
-          Assert.AreEqual(1, type.ToNativeValue("1"));
-
-          try
-          {
-             Assert.AreEqual(1, type.ToNativeValue("adasdads"));
-             Assert.Fail("Invalid format allowed");
-          } catch ( FormatException )
-          {
-          }
-       }
-
-       [Test]
-       public void UINT32_ReadWrite()
-       {
-          AMQType type = AMQType.UINT32;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const uint VALUE = 0xFFEEDDCC;
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       #endregion // UINT32 Tests
-
-       #region VOID Tests
-       [Test]
-       public void VOID_CanGetEncodingSize()
-       {
-          AMQType type = AMQType.VOID;
-          Assert.AreEqual(0, type.GetEncodingSize(null));
-       }
-
-       [Test]
-       public void VOID_ToNativeValue()
-       {
-          AMQType type = AMQType.VOID;
-          Assert.IsNull(type.ToNativeValue(null));
-
-          try
-          {
-             type.ToNativeValue("asdasd");
-             Assert.Fail("converted invalid value");
-          } catch (FormatException)
-          {
-          }
-       }
-
-       [Test]
-       public void VOID_ReadWrite()
-       {
-          AMQType type = AMQType.VOID;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-
-          type.WriteToBuffer(null, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(null, value.Value);
-       }
-
-       #endregion // VOID Tests
-
-       #region BOOLEAN Tests
-       [Test]
-       public void BOOLEAN_CanGetEncodingSize()
-       {
-          AMQType type = AMQType.BOOLEAN;
-          Assert.AreEqual(1, type.GetEncodingSize(true));
-       }
-
-       [Test]
-       public void BOOLEAN_ToNativeValue()
-       {
-          AMQType type = AMQType.BOOLEAN;
-          Assert.AreEqual(true, type.ToNativeValue(true));
-          Assert.AreEqual(false, type.ToNativeValue("false"));
-
-          try
-          {
-             type.ToNativeValue("asdasd");
-             Assert.Fail("converted invalid value");
-          } catch ( FormatException )
-          {
-          }
-       }
-
-       [Test]
-       public void BOOLEAN_ReadWrite()
-       {
-          AMQType type = AMQType.BOOLEAN;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-
-          type.WriteToBuffer(true, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(true, value.Value);
-       }
-       #endregion // BOOLEAN Tests
-
-       #region INT16 tests
-       [Test]
-       public void INT16_ReadWrite()
-       {
-          AMQType type = AMQType.INT16;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const short VALUE = -32765;
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       //public void UINT16_ReadWrite()
-       //{
-       //   AMQType type = AMQType.UINT16;
-       //   ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-       //   const ushort VALUE = 64321;
-
-       //   type.WriteToBuffer(VALUE, buffer);
-       //   buffer.Flip();
-       //   buffer.Rewind();
-       //   AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-       //   Assert.AreEqual(VALUE, value.Value);
-       //}
-       #endregion // INT16 Tests
-
-       #region INT32 tests
-       [Test]
-       public void INT32_ReadWrite()
-       {
-          AMQType type = AMQType.INT32;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const int VALUE = -39273563;
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       #endregion // INT32 Tests
-
-       #region INT64 tests
-       [Test]
-       public void INT64_ReadWrite()
-       {
-          AMQType type = AMQType.INT64;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const long VALUE = -(2^43+1233123);
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       [Test]
-       public void UINT64_ReadWrite()
-       {
-          AMQType type = AMQType.UINT64;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const ulong VALUE = (2 ^ 61 + 1233123);
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       #endregion // INT64 Tests
-
-       #region FLOAT tests
-       [Test]
-       public void FLOAT_ReadWrite()
-       {
-          AMQType type = AMQType.FLOAT;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const float VALUE = 1.2345000E-035f;
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       #endregion // FLOAT Tests
-
-       #region DOUBLE tests
-       [Test]
-       public void DOUBLE_ReadWrite()
-       {
-          AMQType type = AMQType.DOUBLE;
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          const double VALUE = 1.2345000E-045;
-
-          type.WriteToBuffer(VALUE, buffer);
-          buffer.Flip();
-          buffer.Rewind();
-          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
-          Assert.AreEqual(VALUE, value.Value);
-       }
-       #endregion // FLOAT Tests
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using NUnit.Framework;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Framing.Tests
+{
+    [TestFixture]
+    public class TestAMQType
+    {
+
+       #region LONG_STRING tests
+       [Test]
+       public void LONG_STRING_ReadWrite()
+       {
+          AMQType type = AMQType.LONG_STRING;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const string VALUE = "simple string 1";
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       #endregion // LONG_STRING tests
+
+       #region UINT32 tests
+       [Test]
+       public void UINT32_CanGetEncodingSize()
+       {
+          AMQType type = AMQType.UINT32;
+          Assert.AreEqual(4, type.GetEncodingSize(1234443));
+       }
+
+       [Test]
+       public void UINT32_ToNativeValue()
+       {
+          AMQType type = AMQType.UINT32;
+          Assert.AreEqual(1, type.ToNativeValue(1));
+          Assert.AreEqual(1, type.ToNativeValue((short)1));
+          Assert.AreEqual(1, type.ToNativeValue((byte)1));
+          Assert.AreEqual(1, type.ToNativeValue("1"));
+
+          try
+          {
+             Assert.AreEqual(1, type.ToNativeValue("adasdads"));
+             Assert.Fail("Invalid format allowed");
+          } catch ( FormatException )
+          {
+          }
+       }
+
+       [Test]
+       public void UINT32_ReadWrite()
+       {
+          AMQType type = AMQType.UINT32;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const uint VALUE = 0xFFEEDDCC;
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       #endregion // UINT32 Tests
+
+       #region VOID Tests
+       [Test]
+       public void VOID_CanGetEncodingSize()
+       {
+          AMQType type = AMQType.VOID;
+          Assert.AreEqual(0, type.GetEncodingSize(null));
+       }
+
+       [Test]
+       public void VOID_ToNativeValue()
+       {
+          AMQType type = AMQType.VOID;
+          Assert.IsNull(type.ToNativeValue(null));
+
+          try
+          {
+             type.ToNativeValue("asdasd");
+             Assert.Fail("converted invalid value");
+          } catch (FormatException)
+          {
+          }
+       }
+
+       [Test]
+       public void VOID_ReadWrite()
+       {
+          AMQType type = AMQType.VOID;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+
+          type.WriteToBuffer(null, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(null, value.Value);
+       }
+
+       #endregion // VOID Tests
+
+       #region BOOLEAN Tests
+       [Test]
+       public void BOOLEAN_CanGetEncodingSize()
+       {
+          AMQType type = AMQType.BOOLEAN;
+          Assert.AreEqual(1, type.GetEncodingSize(true));
+       }
+
+       [Test]
+       public void BOOLEAN_ToNativeValue()
+       {
+          AMQType type = AMQType.BOOLEAN;
+          Assert.AreEqual(true, type.ToNativeValue(true));
+          Assert.AreEqual(false, type.ToNativeValue("false"));
+
+          try
+          {
+             type.ToNativeValue("asdasd");
+             Assert.Fail("converted invalid value");
+          } catch ( FormatException )
+          {
+          }
+       }
+
+       [Test]
+       public void BOOLEAN_ReadWrite()
+       {
+          AMQType type = AMQType.BOOLEAN;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+
+          type.WriteToBuffer(true, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(true, value.Value);
+       }
+       #endregion // BOOLEAN Tests
+
+       #region INT16 tests
+       [Test]
+       public void INT16_ReadWrite()
+       {
+          AMQType type = AMQType.INT16;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const short VALUE = -32765;
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       //public void UINT16_ReadWrite()
+       //{
+       //   AMQType type = AMQType.UINT16;
+       //   ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+       //   const ushort VALUE = 64321;
+
+       //   type.WriteToBuffer(VALUE, buffer);
+       //   buffer.Flip();
+       //   buffer.Rewind();
+       //   AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+       //   Assert.AreEqual(VALUE, value.Value);
+       //}
+       #endregion // INT16 Tests
+
+       #region INT32 tests
+       [Test]
+       public void INT32_ReadWrite()
+       {
+          AMQType type = AMQType.INT32;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const int VALUE = -39273563;
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       #endregion // INT32 Tests
+
+       #region INT64 tests
+       [Test]
+       public void INT64_ReadWrite()
+       {
+          AMQType type = AMQType.INT64;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const long VALUE = -(2^43+1233123);
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       [Test]
+       public void UINT64_ReadWrite()
+       {
+          AMQType type = AMQType.UINT64;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const ulong VALUE = (2 ^ 61 + 1233123);
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       #endregion // INT64 Tests
+
+       #region FLOAT tests
+       [Test]
+       public void FLOAT_ReadWrite()
+       {
+          AMQType type = AMQType.FLOAT;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const float VALUE = 1.2345000E-035f;
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       #endregion // FLOAT Tests
+
+       #region DOUBLE tests
+       [Test]
+       public void DOUBLE_ReadWrite()
+       {
+          AMQType type = AMQType.DOUBLE;
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          const double VALUE = 1.2345000E-045;
+
+          type.WriteToBuffer(VALUE, buffer);
+          buffer.Flip();
+          buffer.Rewind();
+          AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+          Assert.AreEqual(VALUE, value.Value);
+       }
+       #endregion // FLOAT Tests
+    }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs Thu Dec  3 23:55:48 2009
@@ -1,60 +1,60 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using NUnit.Framework;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Framing.Tests
-{
-    [TestFixture]
-    public class TestEncodingUtils
-    {
-       [Test]
-       public void CanReadLongAsShortString()
-       {
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          EncodingUtils.WriteShortStringBytes(buffer, "98878122");
-          buffer.Flip();
-          long value = EncodingUtils.ReadLongAsShortString(buffer);
-          Assert.AreEqual(98878122, value);
-       }
-       [Test]
-       public void CanReadLongAsShortStringNegative()
-       {
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          EncodingUtils.WriteShortStringBytes(buffer, "-98878122");
-          buffer.Flip();
-          long value = EncodingUtils.ReadLongAsShortString(buffer);
-          Assert.AreEqual(-98878122, value);
-       }
-       [Test]
-       public void CanReadLongAsShortStringEmpty()
-       {
-          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-          EncodingUtils.WriteShortStringBytes(buffer, "");
-          buffer.Flip();
-          long value = EncodingUtils.ReadLongAsShortString(buffer);
-          Assert.AreEqual(0, value);
-       }
-
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using NUnit.Framework;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Framing.Tests
+{
+    [TestFixture]
+    public class TestEncodingUtils
+    {
+       [Test]
+       public void CanReadLongAsShortString()
+       {
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          EncodingUtils.WriteShortStringBytes(buffer, "98878122");
+          buffer.Flip();
+          long value = EncodingUtils.ReadLongAsShortString(buffer);
+          Assert.AreEqual(98878122, value);
+       }
+       [Test]
+       public void CanReadLongAsShortStringNegative()
+       {
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          EncodingUtils.WriteShortStringBytes(buffer, "-98878122");
+          buffer.Flip();
+          long value = EncodingUtils.ReadLongAsShortString(buffer);
+          Assert.AreEqual(-98878122, value);
+       }
+       [Test]
+       public void CanReadLongAsShortStringEmpty()
+       {
+          ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+          EncodingUtils.WriteShortStringBytes(buffer, "");
+          buffer.Flip();
+          long value = EncodingUtils.ReadLongAsShortString(buffer);
+          Assert.AreEqual(0, value);
+       }
+
+    }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs Thu Dec  3 23:55:48 2009
@@ -1,46 +1,46 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Runtime.Serialization;
-
-using Apache.Qpid.Protocol;
-
-namespace Apache.Qpid
-{
-   /// <summary>
-   /// Thrown when an invalid argument was supplied to the broker
-   /// </summary>
-   [Serializable]
-   public class AMQInvalidArgumentException : AMQException
-   {
-      public AMQInvalidArgumentException(string message)
-         : base(AMQConstant.INVALID_ARGUMENT.Code, message, null)
-      {
-      }
-
-      protected AMQInvalidArgumentException(SerializationInfo info, StreamingContext ctxt)
-         : base(info, ctxt)
-      {
-      }
-
-   }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Runtime.Serialization;
+
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid
+{
+   /// <summary>
+   /// Thrown when an invalid argument was supplied to the broker
+   /// </summary>
+   [Serializable]
+   public class AMQInvalidArgumentException : AMQException
+   {
+      public AMQInvalidArgumentException(string message)
+         : base(AMQConstant.INVALID_ARGUMENT.Code, message, null)
+      {
+      }
+
+      protected AMQInvalidArgumentException(SerializationInfo info, StreamingContext ctxt)
+         : base(info, ctxt)
+      {
+      }
+
+   }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs Thu Dec  3 23:55:48 2009
@@ -1,46 +1,46 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Runtime.Serialization;
-
-using Apache.Qpid.Protocol;
-
-namespace Apache.Qpid
-{
-   /// <summary>
-   /// Thrown when an invalid routing key was sent to the broker
-   /// </summary>
-   [Serializable]
-   public class AMQInvalidRoutingKeyException : AMQException
-   {
-      public AMQInvalidRoutingKeyException(string message)
-         : base(AMQConstant.INVALID_ROUTING_KEY.Code, message, null)
-      {
-      }
-
-      protected AMQInvalidRoutingKeyException(SerializationInfo info, StreamingContext ctxt)
-         : base(info, ctxt)
-      {
-      }
-
-   }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Runtime.Serialization;
+
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid
+{
+   /// <summary>
+   /// Thrown when an invalid routing key was sent to the broker
+   /// </summary>
+   [Serializable]
+   public class AMQInvalidRoutingKeyException : AMQException
+   {
+      public AMQInvalidRoutingKeyException(string message)
+         : base(AMQConstant.INVALID_ROUTING_KEY.Code, message, null)
+      {
+      }
+
+      protected AMQInvalidRoutingKeyException(SerializationInfo info, StreamingContext ctxt)
+         : base(info, ctxt)
+      {
+      }
+
+   }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs Thu Dec  3 23:55:48 2009
@@ -1,113 +1,113 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Threading;
-
-
-namespace Apache.Qpid.Collections
-{
-   /// <summary>
-   /// Simple FIFO queue to support multi-threaded consumer
-   /// and producers. It supports timeouts in dequeue operations.
-   /// </summary>
-   public sealed class ConsumerProducerQueue 
-   {
-      private Queue _queue = new Queue();
-      private WaitSemaphore _semaphore = new WaitSemaphore();
-
-      /// <summary>
-      /// Put an item into the tail of the queue
-      /// </summary>
-      /// <param name="item"></param>
-      public void Enqueue(object item)
-      {
-         lock ( _queue.SyncRoot )
-         {
-            _queue.Enqueue(item);
-            _semaphore.Increment();
-         }
-      }
-
-      /// <summary>
-      /// Wait indefinitely for an item to be available
-      /// on the queue.
-      /// </summary>
-      /// <returns>The object at the head of the queue</returns>
-      public object Dequeue()
-      {
-         return Dequeue(Timeout.Infinite);
-      }
-
-      /// <summary>
-      /// Wait up to the number of milliseconds specified
-      /// for an item to be available on the queue
-      /// </summary>
-      /// <param name="timeout">Number of milliseconds to wait</param>
-      /// <returns>The object at the head of the queue, or null 
-      /// if the timeout expires</returns>
-      public object Dequeue(long timeout)
-      {
-         if ( _semaphore.Decrement(timeout) )
-         {
-            lock ( _queue.SyncRoot )
-            {
-               return _queue.Dequeue();
-            }
-         }
-         return null;
-      }
-
-      #region Simple Semaphore
-      //
-      // Simple Semaphore
-      //
-
-      class WaitSemaphore
-      {
-         private int _count;
-         private AutoResetEvent _event = new AutoResetEvent(false);
-
-         public void Increment()
-         {
-            Interlocked.Increment(ref _count);
-            _event.Set();
-         }
-
-         public bool Decrement(long timeout)
-         {
-            if ( timeout > int.MaxValue )
-               throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
-
-            int millis = (int) (timeout & 0x7FFFFFFF);
-            if ( Interlocked.Decrement(ref _count) > 0 )
-            {
-               // there are messages in queue, so no need to wait
-               return true;
-            } else
-            {
-               return _event.WaitOne(millis, false);
-            }
-         }
-      }
-      #endregion // Simple Semaphore
-   }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Apache.Qpid.Collections
+{
+   /// <summary>
+   /// Simple FIFO queue to support multi-threaded consumer
+   /// and producers. It supports timeouts in dequeue operations.
+   /// </summary>
+   public sealed class ConsumerProducerQueue 
+   {
+      private Queue _queue = new Queue();
+      private WaitSemaphore _semaphore = new WaitSemaphore();
+
+      /// <summary>
+      /// Put an item into the tail of the queue
+      /// </summary>
+      /// <param name="item"></param>
+      public void Enqueue(object item)
+      {
+         lock ( _queue.SyncRoot )
+         {
+            _queue.Enqueue(item);
+            _semaphore.Increment();
+         }
+      }
+
+      /// <summary>
+      /// Wait indefinitely for an item to be available
+      /// on the queue.
+      /// </summary>
+      /// <returns>The object at the head of the queue</returns>
+      public object Dequeue()
+      {
+         return Dequeue(Timeout.Infinite);
+      }
+
+      /// <summary>
+      /// Wait up to the number of milliseconds specified
+      /// for an item to be available on the queue
+      /// </summary>
+      /// <param name="timeout">Number of milliseconds to wait</param>
+      /// <returns>The object at the head of the queue, or null 
+      /// if the timeout expires</returns>
+      public object Dequeue(long timeout)
+      {
+         if ( _semaphore.Decrement(timeout) )
+         {
+            lock ( _queue.SyncRoot )
+            {
+               return _queue.Dequeue();
+            }
+         }
+         return null;
+      }
+
+      #region Simple Semaphore
+      //
+      // Simple Semaphore
+      //
+
+      class WaitSemaphore
+      {
+         private int _count;
+         private AutoResetEvent _event = new AutoResetEvent(false);
+
+         public void Increment()
+         {
+            Interlocked.Increment(ref _count);
+            _event.Set();
+         }
+
+         public bool Decrement(long timeout)
+         {
+            if ( timeout > int.MaxValue )
+               throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
+
+            int millis = (int) (timeout & 0x7FFFFFFF);
+            if ( Interlocked.Decrement(ref _count) > 0 )
+            {
+               // there are messages in queue, so no need to wait
+               return true;
+            } else
+            {
+               return _event.WaitOne(millis, false);
+            }
+         }
+      }
+      #endregion // Simple Semaphore
+   }
+}

Propchange: qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message