activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r810662 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent: BlockingQueue.h SynchronousQueue.h
Date Wed, 02 Sep 2009 18:41:54 GMT
Author: tabish
Date: Wed Sep  2 18:41:52 2009
New Revision: 810662

URL: http://svn.apache.org/viewvc?rev=810662&view=rev
Log:
Add some basic implementation and clean up the interfaces some more.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/SynchronousQueue.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h?rev=810662&r1=810661&r2=810662&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
Wed Sep  2 18:41:52 2009
@@ -19,7 +19,10 @@
 #define _DECAF_UTIL_CONCURRENT_BLOCKINGQUEUE_H_
 
 #include <decaf/util/Config.h>
-#include <decaf/util/Queue.h>
+#include <decaf/util/AbstractQueue.h>
+
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/lang/exceptions/InterruptedException.h>
 
 namespace decaf {
 namespace util {
@@ -159,50 +162,13 @@
      * @since 1.0
      */
     template< typename E >
-    class BlockingQueue : public Queue<E> {
+    class BlockingQueue : public AbstractQueue<E> {
     public:
 
         virtual ~BlockingQueue() {
         }
 
         /**
-         * Inserts the specified element into this queue if it is possible to do so immediately
-         * without violating capacity restrictions, returning <tt>true</tt> upon
success and
-         * throwing an <tt>IllegalStateException</tt> if no space is currently
available.
-         * When using a capacity-restricted queue, it is generally preferable to use
-         * {@link #offer(Object) offer}.
-         *
-         * @param e the element to add
-         * @return <tt>true</tt> (as specified by {@link Collection#add})
-         * @throws IllegalStateException if the element cannot be added at this
-         *         time due to capacity restrictions
-         * @throws IllegalArgumentException if some property of the specified
-         *         element prevents it from being added to this queue
-         */
-        virtual bool add( const E& e )
-            throw( lang::exceptions::IllegalStateException,
-                   lang::exceptions::IllegalArgumentException );
-
-        /**
-         * Inserts the specified element into this queue if it is possible to do
-         * so immediately without violating capacity restrictions, returning
-         * <tt>true</tt> upon success and <tt>false</tt> if no space
is currently
-         * available.  When using a capacity-restricted queue, this method is
-         * generally preferable to {@link #add}, which can fail to insert an
-         * element only by throwing an exception.
-         *
-         * @param e the element to add
-         * @return <tt>true</tt> if the element was added to this queue, else
-         *         <tt>false</tt>
-         * @throws NullPointerException if the specified element is null
-         * @throws IllegalArgumentException if some property of the specified
-         *         element prevents it from being added to this queue
-         */
-        virtual bool offer( const E& value )
-            throw( decaf::lang::exceptions::NullPointerException,
-                   decaf::lang::exceptions::IllegalArgumentException ) = 0;
-
-        /**
          * Inserts the specified element into this queue, waiting if necessary for space
          * to become available.
          *
@@ -276,35 +242,7 @@
          *
          * @return the remaining capacity
          */
-        virtual int remainingCapacity() = 0;
-
-        /**
-         * Removes a single instance of the specified element from this queue,
-         * if it is present.  More formally, removes an element <tt>e</tt> such
-         * that <tt>o.equals(e)</tt>, if this queue contains one or more such
-         * elements.
-         * Returns <tt>true</tt> if this queue contained the specified element
-         * (or equivalently, if this queue changed as a result of the call).
-         *
-         * @param value element to be removed from this queue, if present
-         * @return <tt>true</tt> if this queue changed as a result of the call
-         *
-         * @throws NullPointerException if the specified element is null (optional)
-         */
-        virtual bool remove( const E& value )
-            throw( decaf::lang::exceptions::NullPointerException ) = 0;
-
-        /**
-         * Returns <tt>true</tt> if this queue contains the specified element.
-         * More formally, returns <tt>true</tt> if and only if this queue contains
-         * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
-         *
-         * @param o object to be checked for containment in this queue
-         * @return <tt>true</tt> if this queue contains the specified element
-         * @throws NullPointerException if the specified element is null (optional)
-         */
-        virtual bool contains( const E& value )
-            throw( decaf::lang::exceptions::NullPointerException ) = 0;
+        virtual int remainingCapacity() const = 0;
 
         /**
          * Removes all available elements from this queue and adds them to the given
@@ -324,7 +262,7 @@
          *         queue, or some property of an element of this queue prevents
          *         it from being added to the specified collection
          */
-        virtual int drainTo( Collection<E>& c )
+        virtual std::size_t drainTo( Collection<E>& c )
             throw( decaf::lang::exceptions::UnsupportedOperationException,
                    decaf::lang::exceptions::IllegalArgumentException ) = 0;
 
@@ -348,7 +286,7 @@
          *         queue, or some property of an element of this queue prevents
          *         it from being added to the specified collection
          */
-        virtual int drainTo( Collection<E>& c, int maxElements )
+        virtual std::size_t drainTo( Collection<E>& c, std::size_t maxElements
)
             throw( decaf::lang::exceptions::UnsupportedOperationException,
                    decaf::lang::exceptions::IllegalArgumentException ) = 0;
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/SynchronousQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/SynchronousQueue.h?rev=810662&r1=810661&r2=810662&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/SynchronousQueue.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/SynchronousQueue.h
Wed Sep  2 18:41:52 2009
@@ -22,6 +22,8 @@
 
 #include <decaf/util/concurrent/BlockingQueue.h>
 
+#include <vector>
+
 namespace decaf {
 namespace util {
 namespace concurrent {
@@ -62,6 +64,31 @@
     class SynchronousQueue : public BlockingQueue<E> {
     private:
 
+        class EmptyIterator : public Iterator<E> {
+        public:
+
+            virtual E next() throw( lang::exceptions::NoSuchElementException ) {
+
+                throw lang::exceptions::NoSuchElementException(
+                    __FILE__, __LINE__,
+                    "Cannot traverse a Synchronous Queue." );
+            }
+
+            virtual bool hasNext() const {
+                return false;
+            }
+
+            virtual void remove() throw ( lang::exceptions::IllegalStateException,
+                                          lang::exceptions::UnsupportedOperationException
) {
+
+                throw lang::exceptions::IllegalStateException(
+                    __FILE__, __LINE__,
+                    "No Elements to remove from a Synchronous Queue." );
+            }
+        };
+
+    private:
+
         SynchronousQueue( const SynchronousQueue& );
         SynchronousQueue& operator= ( const SynchronousQueue& );
 
@@ -71,6 +98,244 @@
 
         virtual ~SynchronousQueue() {}
 
+        /**
+         * Adds the specified element to this queue, waiting if necessary for
+         * another thread to receive it.
+         *
+         * @param value the element to add to the Queue.
+         *
+         * @throws InterruptedException {@inheritDoc}
+         * @throws NullPointerException {@inheritDoc}
+         * @throws IllegalArgumentException {@inheritDoc}
+         */
+        virtual void put( const E& value )
+            throw( decaf::lang::exceptions::InterruptedException,
+                   decaf::lang::exceptions::NullPointerException,
+                   decaf::lang::exceptions::IllegalArgumentException ) {
+
+            //if (o == null) throw new NullPointerException();
+            //if (transferer.transfer(o, false, 0) == null) {
+            //    Thread.interrupted();
+            //    throw new InterruptedException();
+            //}
+        }
+
+        /**
+         * Inserts the specified element into this queue, waiting if necessary
+         * up to the specified wait time for another thread to receive it.
+         *
+         * @return <tt>true</tt> if successful, or <tt>false</tt>
if the
+         *         specified waiting time elapses before a consumer appears.
+         *
+         * @throws InterruptedException {@inheritDoc}
+         * @throws NullPointerException {@inheritDoc}
+         * @throws IllegalArgumentException {@inheritDoc}
+         */
+        virtual bool offer( const E& e, long timeout, const TimeUnit& unit )
+            throw( decaf::lang::exceptions::InterruptedException,
+                   decaf::lang::exceptions::NullPointerException,
+                   decaf::lang::exceptions::IllegalArgumentException ) {
+
+            //if (o == null) throw new NullPointerException();
+            //if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
+            //    return true;
+            //if (!Thread.interrupted())
+            //    return false;
+            //throw new InterruptedException();
+
+            throw false;
+        }
+
+        /**
+         * Inserts the specified element into this queue, if another thread is
+         * waiting to receive it.
+         *
+         * @param value the element to add to the Queue
+         *
+         * @return <tt>true</tt> if the element was added to this queue, else
+         *         <tt>false</tt>
+         *
+         * @throws NullPointerException if the Queue implementation does not allow Null values
to
+         *         be inserted into the Queue.
+         * @throws IllegalArgumentException if some property of the specified
+         *         element prevents it from being added to this queue
+         */
+        virtual bool offer( const E& value )
+            throw( decaf::lang::exceptions::NullPointerException,
+                   decaf::lang::exceptions::IllegalArgumentException ) {
+
+            //if (e == null) throw new NullPointerException();
+            //return transferer.transfer(e, true, 0) != null;
+
+            return false;
+        }
+
+        /**
+         * Retrieves and removes the head of this queue, waiting if necessary
+         * for another thread to insert it.
+         *
+         * @return the head of this queue
+         * @throws InterruptedException {@inheritDoc}
+         */
+        virtual E take() throw( decaf::lang::exceptions::InterruptedException ) {
+            //Object e = transferer.transfer(null, false, 0);
+            //if (e != null)
+            //    return (E)e;
+            //Thread.interrupted();
+            //throw new InterruptedException();
+
+            return E();
+        }
+
+        /**
+         * Retrieves and removes the head of this queue, waiting
+         * if necessary up to the specified wait time, for another thread
+         * to insert it.
+         *
+         * @param result a reference to the value where the head of the Queue
+         *               should be copied to.
+         * @param timeout the time that the method should block if there is no
+         *                element available to return.
+         * @param unit the Time Units that the timeout value represents.
+         *
+         * @return true if the head of the Queue was copied to the result param
+         *         or false if no value could be returned.
+         */
+        virtual bool poll( E& result, long long timeout, const TimeUnit& unit )
+            throw( decaf::lang::exceptions::InterruptedException ) {
+
+            //Object e = transferer.transfer(null, true, unit.toNanos(timeout));
+            //if (e != null || !Thread.interrupted())
+            //    return (E)e;
+            //throw new InterruptedException();
+
+            return false;
+        }
+
+        /**
+         * Retrieves and removes the head of this queue, if another thread
+         * is currently making an element available.
+         *
+         * @param result a reference to the value where the head of the Queue
+         *               should be copied to.
+         *
+         * @return true if the head of the Queue was copied to the result param
+         *         or false if no value could be returned.
+         */
+        virtual bool poll( E& result ) {
+            return false; // (E)transferer.transfer(null, true, 0);
+        }
+
+        virtual bool equals( const Collection<E>& value ) const {
+            if( (void*)&value == this ) {
+                return true;
+            }
+
+            return false;
+        }
+
+        virtual decaf::util::Iterator<E>* iterator() {
+            return new EmptyIterator();
+        }
+
+        virtual decaf::util::Iterator<E>* iterator() const {
+            return new EmptyIterator();
+        }
+
+        virtual bool isEmpty() const {
+            return true;
+        }
+
+        virtual std::size_t size() const {
+            return 0;
+        }
+
+        virtual int remainingCapacity() const {
+            return 0;
+        }
+
+        virtual void clear()
+            throw ( lang::exceptions::UnsupportedOperationException ) {}
+
+        virtual bool contains( const E& value DECAF_UNUSED ) const throw ( lang::Exception
) {
+            return false;
+        }
+
+        virtual bool containsAll( const Collection<E>& collection ) const
+            throw ( lang::Exception ) {
+
+            return collection.isEmpty();
+        }
+
+        virtual bool remove( const E& value DECAF_UNUSED )
+            throw ( lang::exceptions::UnsupportedOperationException,
+                    lang::exceptions::IllegalArgumentException ) {
+
+            return false;
+        }
+
+        virtual bool removeAll( const Collection<E>& collection DECAF_UNUSED )
+            throw ( lang::exceptions::UnsupportedOperationException,
+                    lang::exceptions::IllegalArgumentException ) {
+
+            return false;
+        }
+
+        virtual bool retainAll( const Collection<E>& collection DECAF_UNUSED )
+            throw ( lang::exceptions::UnsupportedOperationException,
+                    lang::exceptions::IllegalArgumentException ) {
+
+            return false;
+        }
+
+        virtual bool peek( E& result DECAF_UNUSED ) const {
+            return false;
+        }
+
+        virtual std::vector<E> toArray() const { return std::vector<E>(); }
+
+        virtual std::size_t drainTo( Collection<E>& c )
+            throw( decaf::lang::exceptions::UnsupportedOperationException,
+                   decaf::lang::exceptions::IllegalArgumentException ) {
+
+            if( (void*)&c == this ) {
+                throw decaf::lang::exceptions::IllegalArgumentException(
+                    __FILE__, __LINE__,
+                    "Cannot drain a Collection to Itself." );
+            }
+
+            std::size_t count = 0;
+            E element;
+
+            while( ( poll( element ) ) != false ) {
+                c.add( element );
+                ++count;
+            }
+
+            return count;
+        }
+
+        virtual std::size_t drainTo( Collection<E>& c, std::size_t maxElements
)
+            throw( decaf::lang::exceptions::UnsupportedOperationException,
+                   decaf::lang::exceptions::IllegalArgumentException ) {
+
+            if( (void*)&c == this ) {
+                throw decaf::lang::exceptions::IllegalArgumentException(
+                    __FILE__, __LINE__,
+                    "Cannot drain a Collection to Itself." );
+            }
+
+            std::size_t count = 0;
+            E element;
+
+            while( count < maxElements && ( poll( element ) != false ) ) {
+                c.add( element );
+                ++count;
+            }
+
+            return count;
+        }
+
     };
 
 }}}



Mime
View raw message