activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r808563 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/transport/inactivity/ main/decaf/util/ test/ test/decaf/util/
Date Thu, 27 Aug 2009 18:35:54 GMT
Author: tabish
Date: Thu Aug 27 18:35:53 2009
New Revision: 808563

URL: http://svn.apache.org/viewvc?rev=808563&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-250

Commit some of the initial work on the Inactivity Monitor.  Most functionality implemented, lack of a real thread pool holds back completing the asynchronous KeepAliveInfo sends.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/PriorityQueue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/Queue.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Thu Aug 27 18:35:53 2009
@@ -121,6 +121,8 @@
     activemq/transport/failover/FailoverTransportListener.cpp \
     activemq/transport/failover/URIPool.cpp \
     activemq/transport/inactivity/InactivityMonitor.cpp \
+    activemq/transport/inactivity/ReadChecker.cpp \
+    activemq/transport/inactivity/WriteChecker.cpp \
     activemq/transport/logging/LoggingTransport.cpp \
     activemq/transport/mock/InternalCommandListener.cpp \
     activemq/transport/mock/MockTransport.cpp \
@@ -686,6 +688,8 @@
     activemq/transport/failover/FailoverTransportListener.h \
     activemq/transport/failover/URIPool.h \
     activemq/transport/inactivity/InactivityMonitor.h \
+    activemq/transport/inactivity/ReadChecker.h \
+    activemq/transport/inactivity/WriteChecker.h \
     activemq/transport/logging/LoggingTransport.h \
     activemq/transport/mock/InternalCommandListener.h \
     activemq/transport/mock/MockTransport.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp Thu Aug 27 18:35:53 2009
@@ -17,11 +17,20 @@
 
 #include "InactivityMonitor.h"
 
+#include "ReadChecker.h"
+#include "WriteChecker.h"
+
+#include <activemq/commands/WireFormatInfo.h>
+#include <activemq/commands/KeepAliveInfo.h>
+#include <decaf/lang/Math.h>
+
 using namespace std;
 using namespace activemq;
+using namespace activemq::commands;
 using namespace activemq::transport;
 using namespace activemq::transport::inactivity;
 using namespace activemq::exceptions;
+using namespace activemq::wireformat;
 using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::util;
@@ -31,28 +40,282 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next )
-:  TransportFilter( next ) {
+namespace activemq{
+namespace transport{
+namespace inactivity{
+
+    class AsyncException : decaf::lang::Runnable {
+    private:
+
+        InactivityMonitor* parent;
+
+    public:
+
+        AsyncException( InactivityMonitor* parent ) : parent( parent ) {
+        }
+
+        virtual void run() {
+            IOException ex (
+                __FILE__, __LINE__,
+                ( std::string( "Channel was inactive for too long: " ) + parent->next->getRemoteAddress() ).c_str() );
+            parent->onException( ex );
+        }
+
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace transport{
+namespace inactivity{
+
+    class AsyncWriteKeepAlive : decaf::lang::Runnable {
+    private:
+
+        InactivityMonitor* parent;
+
+    public:
+
+        AsyncWriteKeepAlive( InactivityMonitor* parent ) : parent( parent ) {
+        }
+
+        virtual void run() {
+            if( parent->monitorStarted.get() ) {
+                try {
+                    Pointer<KeepAliveInfo> info( new KeepAliveInfo() );
+                    info->setResponseRequired( parent->keepAliveResponseRequired );
+                    parent->oneway( info );
+                } catch( IOException e ) {
+                    parent->onException( e );
+                }
+            }
+        }
+
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat )
+:   TransportFilter( next ),
+    wireFormat( wireFormat ),
+    monitorStarted( false ),
+    commandSent( false ),
+    commandReceived( true),
+    failed( false ),
+    inRead( false ),
+    inWrite( false ),
+    readCheckTime( 0 ),
+    writeCheckTime( 0 ),
+    initialDelayTime( 0 ),
+    keepAliveResponseRequired( false ) {
 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 InactivityMonitor::~InactivityMonitor() {
+    try{
+        this->stopMonitorThreads();
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::close() throw( cms::CMSException ) {
+    try{
+        stopMonitorThreads();
+        TransportFilter::close();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::onException( const decaf::lang::Exception& ex ) {
-    TransportFilter::onException( ex );
+
+    if( failed.compareAndSet( false, true ) ) {
+        stopMonitorThreads();
+        TransportFilter::onException( ex );
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::onCommand( const Pointer<Command>& command ) {
+
     TransportFilter::onCommand( command );
+
+    commandReceived.set( true );
+    inRead.set( true );
+
+    try {
+
+        if( command->isWireFormatInfo() ) {
+            synchronized( &monitor ) {
+
+                remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+                try {
+                    startMonitorThreads();
+                } catch( IOException& e ) {
+                    onException( e );
+                }
+            }
+        }
+
+        TransportFilter::onCommand( command );
+
+        inRead.set( false );
+
+    } catch( Exception& ex ) {
+        inRead.set( false );
+        ex.setMark( __FILE__, __LINE__ );
+        throw ex;
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::oneway( const Pointer<Command>& command )
     throw( decaf::io::IOException, decaf::lang::exceptions::UnsupportedOperationException ) {
 
-    TransportFilter::oneway( command );
+    try{
+        // Disable inactivity monitoring while processing a command.  Synchronize this
+        // method - its not synchronized
+        // further down the transport stack and gets called by more
+        // than one thread  by this class
+        synchronized( &inWriteMutes ) {
+            this->inWrite.set( true );
+            try {
+
+                if( failed.get() ) {
+                    throw IOException(
+                        __FILE__, __LINE__,
+                        ( std::string( "Channel was inactive for too long: " ) + next->getRemoteAddress() ).c_str() );
+                }
+
+                if( command->isWireFormatInfo() ) {
+                    synchronized( &monitor ) {
+                        localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+                        startMonitorThreads();
+                    }
+                }
+
+                this->next->oneway( command );
+
+                this->commandSent.set( true );
+                this->inWrite.set( false );
+
+            } catch( Exception& ex ) {
+                this->commandSent.set( true );
+                this->inWrite.set( false );
+
+                ex.setMark( __FILE__, __LINE__ );
+                throw ex;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool InactivityMonitor::allowReadCheck( long long elapsed ) {
+    return elapsed > (readCheckTime * 9 / 10);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::readCheck() {
+
+    if( inRead.get() || wireFormat->inReceive() ) {
+        return;
+    }
+
+    if( !commandReceived.get() ) {
+//        ASYNC_TASKS.execute( new Runnable() {
+//            public void run() {
+//                onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
+//            };
+//
+//        });
+    }
+
+    commandReceived.set(false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::writeCheck() {
+
+    if( inWrite.get() ) {
+        return;
+    }
+
+    if (!commandSent.get()) {
+
+//        ASYNC_TASKS.execute( new Runnable() {
+//            public void run() {
+//                if (monitorStarted.get()) {
+//                    try {
+//
+//                        KeepAliveInfo info = new KeepAliveInfo();
+//                        info.setResponseRequired(keepAliveResponseRequired);
+//                        oneway(info);
+//                    } catch (IOException e) {
+//                        onException(e);
+//                    }
+//                }
+//            };
+//        });
+    }
+
+    commandSent.set(false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::startMonitorThreads() {
+
+    synchronized( &monitor ) {
+
+        if( monitorStarted.get() ) {
+            return;
+        }
+        if( localWireFormatInfo == NULL ) {
+            return;
+        }
+        if( remoteWireFormatInfo == NULL ) {
+            return;
+        }
+
+        readCheckTime = Math::min( localWireFormatInfo->getMaxInactivityDuration(),
+                                   remoteWireFormatInfo->getMaxInactivityDuration() );
+        initialDelayTime = Math::min( localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
+                                      remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
+        if( readCheckTime > 0 ) {
+
+            monitorStarted.set( true );
+            writeCheckerTask.reset( new WriteChecker( this ) );
+            readCheckerTask.reset( new ReadChecker( this ) );
+            writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
+
+            this->writeCheckTimer.scheduleAtFixedRate( writeCheckerTask, initialDelayTime, writeCheckTime );
+            this->readCheckTimer.scheduleAtFixedRate( readCheckerTask, initialDelayTime, readCheckTime );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::stopMonitorThreads() {
+
+    synchronized( &monitor ) {
+
+        if( monitorStarted.compareAndSet( true, false ) ) {
+
+            readCheckerTask->cancel();
+            writeCheckerTask->cancel();
+
+            readCheckTimer.purge();
+            readCheckTimer.cancel();
+            writeCheckTimer.purge();
+            writeCheckTimer.cancel();
+        }
+    }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h Thu Aug 27 18:35:53 2009
@@ -24,6 +24,7 @@
 #include <activemq/commands/Command.h>
 #include <activemq/commands/Response.h>
 #include <activemq/commands/WireFormatInfo.h>
+#include <activemq/wireformat/WireFormat.h>
 
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/Timer.h>
@@ -37,16 +38,30 @@
 
     class ReadChecker;
     class WriteChecker;
+    class AsyncException;
+    class AsyncWriteKeepAlive;
 
     class AMQCPP_API InactivityMonitor : public TransportFilter {
     private:
 
+        // The configured WireFormat for the Transport Chain.
+        Pointer<wireformat::WireFormat> wireFormat;
+
+        // Local and Remote WireFormat information.
         Pointer<commands::WireFormatInfo> localWireFormatInfo;
         Pointer<commands::WireFormatInfo> remoteWireFormatInfo;
 
+        Pointer<ReadChecker> readCheckerTask;
+        Pointer<WriteChecker> writeCheckerTask;
+
+        // TODO - We could optimize so that all instances of an Inactivity monitor share a single
+        //        static instance of the read and write timers.  Have to track the number of tasks
+        //        that are scheduled then so we know when to cancel and cleanup the timers.
         decaf::util::Timer readCheckTimer;
         decaf::util::Timer writeCheckTimer;
 
+        decaf::util::concurrent::atomic::AtomicBoolean monitorStarted;
+
         decaf::util::concurrent::atomic::AtomicBoolean commandSent;
         decaf::util::concurrent::atomic::AtomicBoolean commandReceived;
 
@@ -54,12 +69,19 @@
         decaf::util::concurrent::atomic::AtomicBoolean inRead;
         decaf::util::concurrent::atomic::AtomicBoolean inWrite;
 
+        decaf::util::concurrent::Mutex inWriteMutes;
+        decaf::util::concurrent::Mutex monitor;
+
         long long readCheckTime;
         long long writeCheckTime;
         long long initialDelayTime;
 
         friend class ReadChecker;
         friend class WriteChecker;
+        friend class AsyncException;
+        friend class AsyncWriteKeepAlive;
+
+        bool keepAliveResponseRequired;
 
     public:
 
@@ -69,10 +91,12 @@
          * @param next
          *      The Transport instance that this TransportFilter wraps.
          */
-        InactivityMonitor( const Pointer<Transport>& next );
+        InactivityMonitor( const Pointer<Transport>& next, const Pointer<wireformat::WireFormat>& wireFormat );
 
         virtual ~InactivityMonitor();
 
+        virtual void close() throw( cms::CMSException );
+
         virtual void onException( const decaf::lang::Exception& ex );
 
         virtual void onCommand( const Pointer<Command>& command );
@@ -80,6 +104,31 @@
         virtual void oneway( const Pointer<Command>& command )
             throw( decaf::io::IOException, decaf::lang::exceptions::UnsupportedOperationException );
 
+        bool isKeepAliveResponseRequired() const {
+            return this->keepAliveResponseRequired;
+        }
+
+        void setKeepAliveResponseRequired( bool value ) {
+            this->keepAliveResponseRequired = value;
+        }
+
+    private:
+
+        // Throttles read checking
+        bool allowReadCheck( long long elapsed );
+
+        // Performs a Read Check on the current connection, called from a separate Thread.
+        void readCheck();
+
+        // Perform a Write Check on the current connection, called from a separate Thread.
+        void writeCheck();
+
+        // Stops all the monitoring Threads, cannot restart once called.
+        void stopMonitorThreads();
+
+        // Starts the monitoring Threads,
+        void startMonitorThreads();
+
     };
 
 }}}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp?rev=808563&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp Thu Aug 27 18:35:53 2009
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReadChecker.h"
+
+#include <activemq/transport/inactivity/InactivityMonitor.h>
+
+#include <decaf/lang/System.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::inactivity;
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ReadChecker::ReadChecker( InactivityMonitor* parent ) : TimerTask(), parent( parent ), lastRunTime( 0 ) {
+
+    if( this->parent == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "ReadChecker created with NULL parent." );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ReadChecker::~ReadChecker() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ReadChecker::run() {
+
+    long long now = System::currentTimeMillis();
+    long long elapsed = ( now - lastRunTime );
+
+    // Perhaps the timer executed a read check late.. and then executes
+    // the next read check on time which causes the time elapsed between
+    // read checks to be small..
+
+    // If less than 90% of the read check Time elapsed then abort this readcheck.
+    if( !this->parent->allowReadCheck( elapsed ) ) {
+        return;
+    }
+
+    lastRunTime = now;
+
+    // Involke the parent check routine.
+    this->parent->readCheck();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h?rev=808563&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h Thu Aug 27 18:35:53 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_INACTIVITY_READCHECKER_H_
+#define _ACTIVEMQ_TRANSPORT_INACTIVITY_READCHECKER_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/util/TimerTask.h>
+
+namespace activemq {
+namespace transport {
+namespace inactivity {
+
+    class InactivityMonitor;
+
+    /**
+     * Runnable class that is used by the {@see InactivityMonitor} class the check for
+     * timeouts related to transport reads.
+     *
+     * @since 3.1
+     */
+    class AMQCPP_API ReadChecker : public decaf::util::TimerTask {
+    private:
+
+        // The Inactivity Monitor that created this Read Checker.
+        InactivityMonitor* parent;
+
+        // State value of last time this object was run
+        long long lastRunTime;
+
+    public:
+
+        ReadChecker( InactivityMonitor* parent );
+        virtual ~ReadChecker();
+
+        virtual void run();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_INACTIVITY_READCHECKER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp?rev=808563&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp Thu Aug 27 18:35:53 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteChecker.h"
+
+#include <activemq/transport/inactivity/InactivityMonitor.h>
+
+#include <decaf/lang/System.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::inactivity;
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+WriteChecker::WriteChecker( InactivityMonitor* parent ) : TimerTask(), parent( parent ), lastRunTime( 0 ) {
+
+    if( this->parent == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "WriteChecker created with NULL parent." );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+WriteChecker::~WriteChecker() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void WriteChecker::run() {
+    this->lastRunTime = System::currentTimeMillis();
+    this->parent->writeCheck();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h?rev=808563&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h Thu Aug 27 18:35:53 2009
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_INACTIVITY_WRITECHECKER_H_
+#define _ACTIVEMQ_TRANSPORT_INACTIVITY_WRITECHECKER_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/util/TimerTask.h>
+
+namespace activemq {
+namespace transport {
+namespace inactivity {
+
+    class InactivityMonitor;
+
+    /**
+     * Runnable class used by the {@see InactivityMonitor} to make periodic writes to the underlying
+     * transport if no other write activity is going on in order to more quickly detect failures
+     * of the connection to the broker.
+     *
+     * @since 3.1.0
+     */
+    class AMQCPP_API WriteChecker : public decaf::util::TimerTask {
+    private:
+
+        // The InactivityMonitor instance that created this object.
+        InactivityMonitor* parent;
+
+        // State variable tracking the last execution time of this Runnable.
+        long long lastRunTime;
+
+    public:
+
+        WriteChecker( InactivityMonitor* parent );
+        virtual ~WriteChecker();
+
+        virtual void run();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_INACTIVITY_WRITECHECKER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h Thu Aug 27 18:35:53 2009
@@ -33,8 +33,7 @@
 
     /**
      * This class provides skeletal implementations of some Queue  operations.
-     * Methods add, remove, and element are based on offer, poll, and peek, respectively
-     * but throw exceptions instead of indicating failure via false or null returns.
+     * Methods add, remove, and element are based on offer, poll, and peek, respectively.
      *
      * A Queue implementation that extends this class must minimally define a method Queue.
      * offer(E) which does not permit insertion of null elements, along with methods Queue.
@@ -46,13 +45,9 @@
      */
     template< typename E >
     class AbstractQueue : public decaf::util::Queue<E> {
-    private:
-
-        E emptyMarker;
-
     public:
 
-        AbstractQueue() : emptyMarker() {}
+        AbstractQueue() : Queue<E>() {}
 
         virtual ~AbstractQueue() {}
 
@@ -72,7 +67,8 @@
          */
         virtual bool add( const E& value )
             throw ( decaf::lang::exceptions::UnsupportedOperationException,
-                    decaf::lang::exceptions::IllegalArgumentException ) {
+                    decaf::lang::exceptions::IllegalArgumentException,
+                    decaf::lang::exceptions::IllegalStateException ) {
 
             if( offer( value ) ) {
                 return true;
@@ -100,7 +96,8 @@
          */
         virtual bool addAll( const Collection<E>& collection )
             throw ( lang::exceptions::UnsupportedOperationException,
-                    lang::exceptions::IllegalArgumentException ) {
+                    lang::exceptions::IllegalArgumentException,
+                    lang::exceptions::IllegalStateException ) {
 
             if( this == &collection ) {
                 throw decaf::lang::exceptions::IllegalArgumentException(
@@ -121,14 +118,7 @@
          * @throws NoSuchElementException if the queue is empty.
          */
         virtual E remove() throw ( decaf::lang::exceptions::NoSuchElementException ) {
-
-            E result = this->poll();
-            if( result == this->getEmptyMarker() ) {
-                throw decaf::lang::exceptions::NoSuchElementException(
-                    __FILE__, __LINE__, "Queue is empty." );
-            }
-
-            return result;
+            return this->poll();
         }
 
         /**
@@ -143,13 +133,7 @@
         virtual const E& element() const
             throw( decaf::lang::exceptions::NoSuchElementException ) {
 
-            const E& result = this->peek();
-            if( result == this->getEmptyMarker() ) {
-                throw decaf::lang::exceptions::NoSuchElementException(
-                    __FILE__, __LINE__, "Queue is empty." );
-            }
-
-            return result;
+            return this->peek();
         }
 
         /**
@@ -159,10 +143,13 @@
          */
         virtual void clear() throw ( lang::exceptions::UnsupportedOperationException ) {
 
-            E result;
+            if( this->isEmpty() ) {
+                return;
+            }
+
             do {
-                result = this->poll();
-            } while( result != this->getEmptyMarker() );
+                this->poll();
+            } while( !this->isEmpty() );
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/PriorityQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/PriorityQueue.h?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/PriorityQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/PriorityQueue.h Thu Aug 27 18:35:53 2009
@@ -19,15 +19,19 @@
 #define _DECAF_UTIL_PRIORITYQUEUE_H_
 
 #include <decaf/util/Config.h>
+#include <decaf/util/Collection.h>
 #include <decaf/util/AbstractQueue.h>
 #include <decaf/util/Iterator.h>
 #include <decaf/util/Comparator.h>
 #include <decaf/util/comparators/Less.h>
 
+#include <decaf/lang/Math.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 
+#include <memory>
+
 namespace decaf {
 namespace util {
 
@@ -65,11 +69,13 @@
     class PriorityQueue : public AbstractQueue<E> {
     private:
 
-        static const int DEFAULT_CAPACITY = 256;
+        static const int DEFAULT_CAPACITY = 11;
+        static const int DEFAULT_CAPACITY_RATIO = 2;
 
         std::size_t _size;
+        std::size_t capacity;
         E* elements;
-        decaf::lang::Pointer< Comparator<E> > comparator;
+        decaf::lang::Pointer< Comparator<E> > _comparator;
 
     private:
 
@@ -77,23 +83,40 @@
         private:
 
             std::size_t position;
+            bool allowRemove;
             PriorityQueue* queue;
 
         public:
 
-            PriorityQueueIterator( PriorityQueue* queue ) : position( 0 ), queue( queue ) {}
+            PriorityQueueIterator( PriorityQueue* queue ) : position( 0 ), allowRemove( false ), queue( queue ) {}
 
             virtual E next() throw( lang::exceptions::NoSuchElementException ) {
-                return E();
+
+                if( !hasNext() ) {
+                    throw lang::exceptions::NoSuchElementException(
+                        __FILE__, __LINE__,
+                        "No more elements to Iterate over." );
+                }
+
+                allowRemove = true;
+                return queue->elements[position++];
             }
 
             virtual bool hasNext() const {
-                return false;
+                return position < queue->_size;
             }
 
             virtual void remove() throw ( lang::exceptions::IllegalStateException,
                                           lang::exceptions::UnsupportedOperationException ) {
 
+                if( !allowRemove ) {
+                    throw lang::exceptions::IllegalStateException(
+                        __FILE__, __LINE__,
+                        "No more elements to Iterate over." );
+                }
+
+                allowRemove = false;
+                queue->removeAt( position-- );
             }
         };
 
@@ -112,13 +135,15 @@
             }
         };
 
+        friend class PriorityQueueIterator;
+
     public:
 
         /**
          * Creates a Priority Queue with the default initial capacity.
          */
-        PriorityQueue() : _size( 0 ){
-            this->initQueue( DEFAULT_CAPACITY, NULL );
+        PriorityQueue() : AbstractQueue<E>(), _size( 0 ), capacity( 0 ), elements( NULL ) {
+            this->initQueue( DEFAULT_CAPACITY, new comparators::Less<E>() );
         }
 
         /**
@@ -127,7 +152,9 @@
          * @param initialCapacity
          *      The initial number of elements allocated to this PriorityQueue.
          */
-        PriorityQueue( std::size_t initialCapacity ) : _size( 0 ) {
+        PriorityQueue( std::size_t initialCapacity ) :
+            AbstractQueue<E>(), _size( 0 ), capacity( 0 ), elements( NULL ) {
+
             this->initQueue( initialCapacity, new comparators::Less<E>() );
         }
 
@@ -143,7 +170,8 @@
          *
          * @throws NullPointerException if the passed Comparator is NULL.
          */
-        PriorityQueue( std::size_t initialCapacity, Comparator<E>* comparator ) : _size( 0 ){
+        PriorityQueue( std::size_t initialCapacity, Comparator<E>* comparator ) :
+            AbstractQueue<E>(), _size( 0 ), capacity( 0 ), elements( NULL ) {
 
             if( comparator == NULL ) {
                 throw decaf::lang::exceptions::NullPointerException(
@@ -159,8 +187,10 @@
          * @param source
          *      the Collection whose elements are to be placed into this priority queue
          */
-        PriorityQueue( const Collection<E>& source ) : _size( 0 ) {
-            // TODO
+        PriorityQueue( const Collection<E>& source ) :
+            AbstractQueue<E>(), _size( 0 ), capacity( 0 ), elements( NULL ) {
+
+            this->getFromCollection( source );
         }
 
         /**
@@ -170,12 +200,34 @@
          * @param source
          *      the priority queue whose elements are to be placed into this priority queue
          */
-        PriorityQueue( const PriorityQueue<E>& source ) : _size( 0 ) {
-            // TODO
+        PriorityQueue( const PriorityQueue<E>& source ) :
+            AbstractQueue<E>(), _size( 0 ), capacity( 0 ), elements( NULL ) {
+
+            this->getFromPriorityQueue( source );
         }
 
         virtual ~PriorityQueue() {}
 
+        /**
+         * Assignment operator, assign another Collection to this one.
+         *
+         * @param source
+         *        The Collection to copy to this one.
+         */
+        PriorityQueue<E>& operator= ( const Collection<E>& source ) {
+            this->getFromCollection( source );
+        }
+
+        /**
+         * Assignment operator, assign another PriorityQueue to this one.
+         *
+         * @param source
+         *        The PriorityQueue to copy to this one.
+         */
+        PriorityQueue<E>& operator= ( const PriorityQueue<E>& source ) {
+            this->getFromPriorityQueue( source );
+        }
+
         virtual decaf::util::Iterator<E>* iterator() {
             return new PriorityQueueIterator( this );
         }
@@ -188,28 +240,202 @@
             return this->_size;
         }
 
-        const E& getEmptyMarker() const {
-            static const E marker = E();
-            return marker;
+        virtual void clear()
+            throw ( lang::exceptions::UnsupportedOperationException ) {
+
+            // TODO - Provide a more efficient way to clear the array without reallocating it
+            //        we should keep the size it grew to since if reused it could get that big
+            //        again and reallocating all that memory could be to slow.
+            this->elements = new E[DEFAULT_CAPACITY];
+            this->capacity = DEFAULT_CAPACITY;
+            this->_size = 0;
         }
 
-        bool offer( const E& value ) {
-            return false;
+        virtual bool offer( const E& value ) throw( decaf::lang::exceptions::NullPointerException ) {
+
+            // TODO - Check for Null and throw exception
+
+            increaseCapacity( _size + 1 );
+            elements[_size++] = value;
+            upHeap();
+            return true;
         }
 
-        E poll() {
-            return this->getEmptyMarker();
+        virtual E poll() throw( decaf::lang::exceptions::NoSuchElementException ) {
+
+            if( Queue<E>::isEmpty() ) {
+                throw lang::exceptions::NoSuchElementException(
+                    __FILE__, __LINE__, "Queue is empty" );
+            }
+
+            E result = elements[0];
+            removeAt( 0 );
+            return result;
         }
 
-        const E& peek() const {
-            return this->getEmptyMarker();
+        virtual const E& peek() const throw( decaf::lang::exceptions::NoSuchElementException ) {
+
+            if( Queue<E>::isEmpty() ) {
+                throw lang::exceptions::NoSuchElementException(
+                    __FILE__, __LINE__, "Queue is empty" );
+            }
+
+            return elements[0];
+        }
+
+        virtual bool remove( const E& value )
+            throw ( lang::exceptions::UnsupportedOperationException,
+                    lang::exceptions::IllegalArgumentException ) {
+
+            std::size_t targetIndex = 0;
+            for( targetIndex = 0; targetIndex < _size; targetIndex++ ) {
+                if( 0 == _comparator->compare( value, elements[targetIndex] ) ) {
+                    break;
+                }
+            }
+
+            if( _size == 0 || _size == targetIndex ) {
+                return false;
+            }
+
+            removeAt( targetIndex );
+            return true;
+        }
+
+        virtual bool add( const E& value )
+            throw ( lang::exceptions::UnsupportedOperationException,
+                    lang::exceptions::IllegalArgumentException,
+                    lang::exceptions::IllegalStateException ) {
+
+            try {
+                return offer( value );
+            }
+            DECAF_CATCH_RETHROW( lang::exceptions::UnsupportedOperationException )
+            DECAF_CATCH_RETHROW( lang::exceptions::IllegalArgumentException )
+            DECAF_CATCH_RETHROW( lang::exceptions::IllegalStateException )
+            DECAF_CATCH_EXCEPTION_CONVERT( lang::exceptions::NullPointerException, lang::exceptions::UnsupportedOperationException )
+            DECAF_CATCHALL_THROW( lang::exceptions::UnsupportedOperationException )
+        }
+
+        /**
+         * obtains a Copy of the Pointer instance that this PriorityQueue is using to compare the
+         * elements in the queue with.  The returned value is a copy, the caller cannot change the
+         * value if the internal Pointer value.
+         *
+         * @return a copy of the Comparator Pointer being used by this Queue.
+         */
+        decaf::lang::Pointer< Comparator<E> > comparator() const {
+            return this->_comparator;
         }
 
     private:
 
         void initQueue( std::size_t initialSize, Comparator<E>* comparator ) {
             this->elements = new E[initialSize];
-            this->comparator.reset( comparator );
+            this->capacity = initialSize;
+            this->_size = 0;
+            this->_comparator.reset( comparator );
+        }
+
+        void upHeap() {
+
+            std::size_t current = _size - 1;
+            std::size_t parent = ( current - 1 ) / 2;
+
+            while( current != 0 && _comparator->compare( elements[current], elements[parent] ) < 0 ) {
+
+                // swap the two
+                E tmp = elements[current];
+                elements[current] = elements[parent];
+                elements[parent] = tmp;
+
+                // update parent and current positions.
+                current = parent;
+                parent = ( current - 1 ) / 2;
+            }
+        }
+
+        void downHeap( std::size_t pos ) {
+
+            std::size_t current = pos;
+            std::size_t child = 2 * current + 1;
+
+            while( child < _size && !Queue<E>::isEmpty() ) {
+
+                // compare the children if they exist
+                if( child + 1 < _size && _comparator->compare( elements[child + 1], elements[child] ) < 0 ) {
+                    child++;
+                }
+
+                // compare selected child with parent
+                if( _comparator->compare( elements[current], elements[child] ) < 0 ) {
+                    break;
+                }
+
+                // swap the two
+                E tmp = elements[current];
+                elements[current] = elements[child];
+                elements[child] = tmp;
+
+                // update child and current positions
+                current = child;
+                child = 2 * current + 1;
+            }
+        }
+
+        void getFromPriorityQueue( const PriorityQueue<E>& c ) {
+            initCapacity( c );
+            _comparator = c.comparator();
+            for( std::size_t ix = 0; ix < c.size(); ++ix ) {
+                this->elements[ix] = c.elements[ix];
+            }
+            _size = c.size();
+        }
+
+        void getFromCollection( const Collection<E>& c ) {
+            initCapacity( c );
+            _comparator.reset( new comparators::Less<E>() );
+            std::auto_ptr< Iterator<E> > iter( c.iterator() );
+            while( iter->hasNext() ) {
+                this->offer( iter->next() );
+            }
+        }
+
+        void removeAt( std::size_t index ) {
+            _size--;
+            elements[index] = elements[_size];
+            downHeap(index);
+            elements[_size] = E();
+        }
+
+        void initCapacity( const Collection<E>& c ) {
+
+            delete [] elements;
+            _size = 0;
+
+            if( c.isEmpty() ) {
+                capacity = 1;
+                elements = new E[capacity];
+            } else {
+                capacity = (std::size_t) lang::Math::ceil( c.size() * 1.1 );
+                elements = new E[capacity];
+            }
+        }
+
+        void increaseCapacity( std::size_t size ) {
+
+            if( size > capacity ) {
+                E* newElements = new E[ size * DEFAULT_CAPACITY_RATIO ];
+
+                for( std::size_t ix = 0; ix < capacity; ix++ ) {
+                    newElements[ix] = elements[ix];
+                }
+
+                delete [] elements;
+
+                elements = newElements;
+                capacity = size * DEFAULT_CAPACITY_RATIO;
+            }
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/Queue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/Queue.h?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/Queue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/Queue.h Thu Aug 27 18:35:53 2009
@@ -40,9 +40,11 @@
      * operation of the method is allowed. BlockingQueue interface defines such
      * methods.
      *
-     * Certain methods in the Queue interface return a special value instead of throwing
-     * an exception if there is no element in the Queue to return, this special value
-     * can be obtained by calling the Queue method <code>getEmptyMarker</code>.
+     * Unlike the Java Queue interface the methods of this class cannot return null
+     * to indicate that a Queue is empty since null has no meaning for elements that
+     * are class or struct types and a comparison between null and a primitive type
+     * is not a meaningful check for an empty queue.  Methods that would have returned
+     * null in Java throw the NoSuchElementException instead.
      *
      * @since 1.0
      */
@@ -53,17 +55,6 @@
         virtual ~Queue() {}
 
         /**
-         * Returns a reference to the Marker value that is returned from methods that
-         * do not throw an exception when there is no element in the Queue to return.
-         * The empty marker is usually an instance of the contained element initialized
-         * using the default constructor (if its a Class) or the default value for the
-         * primitive type.
-         *
-         * @return a value that indicates that the Queue is empty.
-         */
-        virtual const E& getEmptyMarker() const = 0;
-
-        /**
          * Inserts the specified element into the queue provided that the condition
          * allows such an operation. The method is generally preferable to the
          * collection.add(E), since the latter might throw an exception if the
@@ -73,23 +64,29 @@
          *        the specified element to insert into the queue.
          *
          * @return true if the operation succeeds and false if it fails.
+         *
+         * @throw NullPointerException if the Queue implementation does not allow Null values to
+         *        be inserted into the Queue.
          */
-        virtual bool offer( const E& value ) = 0;
+        virtual bool offer( const E& value ) throw( decaf::lang::exceptions::NullPointerException ) = 0;
 
         /**
          * Gets and removes the element in the head of the queue, or returns null if
          * there is no element in the queue.
          *
-         * @return the element in the head of the queue or null if there is no
-         *         element in the queue.
+         * @return the element in the head of the queue.
+         *
+         * @throws NoSuchElementException
+         *         if there is no element in the queue.
          */
-        virtual E poll() = 0;
+        virtual E poll() throw ( decaf::lang::exceptions::NoSuchElementException ) = 0;
 
         /**
          * Gets and removes the element in the head of the queue. Throws a
          * NoSuchElementException if there is no element in the queue.
          *
          * @return the element in the head of the queue.
+         *
          * @throws NoSuchElementException
          *         if there is no element in the queue.
          */
@@ -98,16 +95,20 @@
         /**
          * Gets but not removes the element in the head of the queue.
          *
-         * @return the element in the head of the queue or null if there is no
-         *         element in the queue.
+         * @return the element in the head of the queue.
+         *
+         * @throws NoSuchElementException
+         *         if there is no element in the queue.
          */
-        virtual const E& peek() const = 0;
+        virtual const E& peek() const
+            throw ( decaf::lang::exceptions::NoSuchElementException ) = 0;
 
         /**
          * Gets but not removes the element in the head of the queue. Throws a
          * NoSuchElementException if there is no element in the queue.
          *
          * @return the element in the head of the queue.
+         *
          * @throws NoSuchElementException
          *         if there is no element in the queue.
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.cpp?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.cpp Thu Aug 27 18:35:53 2009
@@ -18,14 +18,438 @@
 #include "PriorityQueueTest.h"
 
 #include <decaf/util/PriorityQueue.h>
+#include <decaf/util/Comparator.h>
+#include <decaf/util/StlList.h>
+
+#include <algorithm>
+#include <memory>
 
 using namespace std;
 using namespace decaf;
 using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace decaf {
+namespace util {
+
+    class MockComparatorStringByLength : public decaf::util::Comparator<std::string> {
+
+        virtual bool operator() ( const std::string& left, const std::string& right ) const {
+            return left.size() == right.size();
+        }
+
+        virtual int compare( const std::string& o1, const std::string& o2 ) const {
+            return o1.size() < o2.size() ? -1 : o1.size() > o2.size() ? 1 : 0;
+        }
+
+    };
+
+}}
 
 ////////////////////////////////////////////////////////////////////////////////
 void PriorityQueueTest::testConstructor_1() {
 
-    //PriorityQueue<int> pqueue;
+    PriorityQueue<int> pqueue;
+
+    CPPUNIT_ASSERT( pqueue.isEmpty() );
+    CPPUNIT_ASSERT( pqueue.size() == 0 );
+    CPPUNIT_ASSERT( pqueue.comparator() != NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testConstructor_2() {
+
+    PriorityQueue<int> pqueue( 1024 );
+
+    CPPUNIT_ASSERT( pqueue.isEmpty() );
+    CPPUNIT_ASSERT( pqueue.size() == 0 );
+    CPPUNIT_ASSERT( pqueue.comparator() != NULL );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testConstructor_3() {
+
+    PriorityQueue<int> intQueue;
+    StlList<int> collection;
+
+    int array[] = { 12, 2, 456, -11, 99, 111, 456 };
+
+    for( std::size_t ix = 0; ix < sizeof(array)/sizeof(int); ++ix ) {
+        intQueue.offer( array[ix] );
+        collection.add( array[ix] );
+    }
+
+    PriorityQueue<int> copy( collection );
+
+    CPPUNIT_ASSERT( copy.size() == intQueue.size() );
+
+    std::auto_ptr< Iterator<int> > q_iter( intQueue.iterator() );
+    std::auto_ptr< Iterator<int> > c_iter( copy.iterator() );
+
+    while( q_iter->hasNext() && c_iter->hasNext() ) {
+        CPPUNIT_ASSERT( q_iter->next() == c_iter->next() );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testAssignment() {
+
+    PriorityQueue<int> intQueue;
+    StlList<int> collection;
+
+    int array[] = { 12, 2, 456, -11, 99, 111, 456 };
+
+    for( std::size_t ix = 0; ix < sizeof(array)/sizeof(int); ++ix ) {
+        intQueue.offer( array[ix] );
+        collection.add( array[ix] );
+    }
+
+    PriorityQueue<int> copy = collection;
+
+    CPPUNIT_ASSERT( copy.size() == intQueue.size() );
+
+    std::auto_ptr< Iterator<int> > q_iter( intQueue.iterator() );
+    std::auto_ptr< Iterator<int> > c_iter( copy.iterator() );
+
+    while( q_iter->hasNext() && c_iter->hasNext() ) {
+        CPPUNIT_ASSERT( q_iter->next() == c_iter->next() );
+    }
+
+    PriorityQueue<int> assigned = copy;
+
+    std::auto_ptr< Iterator<int> > a1_iter( copy.iterator() );
+    std::auto_ptr< Iterator<int> > a2_iter( assigned.iterator() );
+
+    while( a1_iter->hasNext() && a2_iter->hasNext() ) {
+        CPPUNIT_ASSERT( a1_iter->next() == a2_iter->next() );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testSize() {
+
+    PriorityQueue<int> intQueue;
+
+    CPPUNIT_ASSERT( 0 == intQueue.size() );
+    int array[] = { 2, 45, 7, -12, 9 };
+    for( int i = 0; i < 5; i++ ) {
+        intQueue.offer( array[i] );
+    }
+
+    CPPUNIT_ASSERT( sizeof(array)/sizeof(int) == intQueue.size() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testOfferString() {
+
+    PriorityQueue<std::string> queue( 10, new MockComparatorStringByLength() );
+
+    std::string array[] = { "AAAAA", "AA", "AAAA", "AAAAAAAA" };
+    for( int i = 0; i < 4; i++ ) {
+        queue.offer( array[i] );
+    }
+
+    std::string sortedArray[] = { "AA", "AAAA", "AAAAA", "AAAAAAAA" };
+    for( int i = 0; i < 4; i++ ) {
+        CPPUNIT_ASSERT( sortedArray[i] == queue.poll() );
+    }
+
+    CPPUNIT_ASSERT( 0 == queue.size() );
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        queue.poll(),
+        decaf::lang::exceptions::NoSuchElementException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testPoll() {
+
+    PriorityQueue<int> intQueue;
+    int array[] = { 52, 12, 42, 7, 111 };
+    int sorted[] = { 7, 12, 42, 52, 111 };
+
+    for( int i = 0; i < 5; i++ ) {
+        intQueue.offer( array[i] );
+    }
+
+    for( int i = 0; i < 5; i++ ) {
+        CPPUNIT_ASSERT( sorted[i] == intQueue.poll() );
+    }
+
+    CPPUNIT_ASSERT( 0 == intQueue.size() );
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        intQueue.poll(),
+        decaf::lang::exceptions::NoSuchElementException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testPollEmpty() {
+    PriorityQueue<double> queue;
+    CPPUNIT_ASSERT( 0 == queue.size() );
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        queue.poll(),
+        decaf::lang::exceptions::NoSuchElementException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testPeek() {
+    PriorityQueue<int> integerQueue;
+
+    int array[] = { 2, 45, 7, -12, 9 };
+    int sorted[] = { -12, 2, 7, 9, 45 };
+
+    for( int i = 0; i < 5; i++ ) {
+        integerQueue.add( array[i] );
+    }
+
+    CPPUNIT_ASSERT( sorted[0] == integerQueue.peek() );
+    CPPUNIT_ASSERT( sorted[0] == integerQueue.peek() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testPeekEmpty() {
+    PriorityQueue<float> queue;
+    CPPUNIT_ASSERT( 0 == queue.size() );
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        queue.poll(),
+        decaf::lang::exceptions::NoSuchElementException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testClear() {
+    PriorityQueue<int> integerQueue;
+
+    int array[] = {2, 45, 7, -12, 9};
+
+    for( int i = 0; i < 5; i++ ) {
+        integerQueue.offer( array[i] );
+    }
+
+    integerQueue.clear();
+    CPPUNIT_ASSERT( integerQueue.isEmpty() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testAdd() {
+    PriorityQueue<int> integerQueue;
+
+    int array[] = {2, 45, 7, -12, 9};
+    int sorted[] = { -12, 2, 7, 9, 45 };
+
+    for( int i = 0; i < 5; i++ ) {
+        integerQueue.add( array[i] );
+    }
+
+    CPPUNIT_ASSERT( 5 == integerQueue.size() );
+
+    for( int i = 0; i < 5; i++ ) {
+        CPPUNIT_ASSERT( sorted[i] == integerQueue.poll() );
+    }
+
+    CPPUNIT_ASSERT( 0 == integerQueue.size() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testRemove() {
+
+    int array[] = { 2, 45, 7, -12, 9, 23, 17, 1118, 10, 16, 39 };
+
+    PriorityQueue<int> integerQueue;
+
+    for( int i = 0; i < 11; i++ ) {
+        integerQueue.add( array[i] );
+    }
+
+    CPPUNIT_ASSERT( integerQueue.remove( 16 ) );
+
+    int sorted[] = { -12, 2, 7, 9, 10, 17, 23, 39, 45, 1118 };
+
+    for( int i = 0; i < 10; i++ ) {
+        CPPUNIT_ASSERT( sorted[i] == integerQueue.poll() );
+    }
+
+    CPPUNIT_ASSERT( 0 == integerQueue.size() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testRemoveUsingComparator() {
+
+    PriorityQueue<std::string> queue( 10, new MockComparatorStringByLength() );
+    std::string array[] = {"AAAAA", "AA", "AAAA", "AAAAAAAA"};
+
+    for( int i = 0; i < 4; i++ ) {
+        queue.offer( array[i] );
+    }
+
+    // Prove that the comparator overrides the equality tests for remove, the Queue
+    // doesn't contains BB but it should contain a string of length two.
+    CPPUNIT_ASSERT( !queue.contains( "BB" ) );
+    CPPUNIT_ASSERT( queue.remove( "BB" ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testRemoveNotExists() {
+
+    int array[] = {2, 45, 7, -12, 9, 23, 17, 1118, 10, 16, 39};
+
+    PriorityQueue<int> integerQueue;
+
+    for( int i = 0; i < 11; i++ ) {
+        integerQueue.offer( array[i] );
+    }
+
+    CPPUNIT_ASSERT( !integerQueue.remove( 111 ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testComparator() {
+
+    PriorityQueue<std::string> queue1;
+    CPPUNIT_ASSERT( queue1.comparator() != NULL );
+
+    MockComparatorStringByLength* comparator = new MockComparatorStringByLength();
+    PriorityQueue<std::string> queue2( 100, comparator );
+    CPPUNIT_ASSERT( comparator == queue2.comparator().get() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testIterator() {
+
+    PriorityQueue<int> integerQueue;
+
+    int array[] = {2, 45, 7, -12, 9};
+    int sorted[] = {-12, 2, 7, 9, 45};
+
+    for( int i = 0; i < 5; i++ ) {
+        integerQueue.offer( array[i] );
+    }
+
+    std::auto_ptr< Iterator<int> > iter( integerQueue.iterator() );
+    CPPUNIT_ASSERT( iter.get() != NULL );
+
+    std::vector<int> result;
+
+    while( iter->hasNext() ) {
+        result.push_back( iter->next() );
+    }
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        iter->next(),
+        NoSuchElementException );
+
+    std::sort( result.begin(), result.end() );
+
+    for( int i = 0; i < 5; i++ ) {
+        CPPUNIT_ASSERT( result[i] == sorted[i] );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testIteratorEmpty() {
+
+    PriorityQueue<int> intQueue;
+    std::auto_ptr< Iterator<int> > iter( intQueue.iterator() );
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        iter->next(),
+        NoSuchElementException );
+
+    iter.reset( intQueue.iterator() );
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a IllegalStateException",
+        iter->remove(),
+        IllegalStateException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testIteratorOutOfBounds() {
+
+    PriorityQueue<int> intQueue;
+    intQueue.offer( 0 );
+    std::auto_ptr< Iterator<int> > iter( intQueue.iterator() );
+    iter->next();
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        iter->next(),
+        NoSuchElementException );
+
+    iter.reset( intQueue.iterator() );
+    iter->next();
+    iter->remove();
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a NoSuchElementException",
+        iter->next(),
+        NoSuchElementException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testIteratorRemove() {
+
+    PriorityQueue<int> intQueue;
+    int array[] = {2, 45, 7, -12, 9};
+    for( int i = 0; i < 5; i++ ) {
+        intQueue.offer( array[i] );
+    }
+    std::auto_ptr< Iterator<int> > iter( intQueue.iterator() );
+    CPPUNIT_ASSERT( iter.get() != NULL );
+    for( int i = 0; i < 5; i++ ) {
+        iter->next();
+        if( 2 == i ) {
+            iter->remove();
+        }
+    }
+    CPPUNIT_ASSERT( 4 == intQueue.size() );
+
+    iter.reset( intQueue.iterator() );
+    std::vector<int> newArray;
+    for( int i = 0; i < 4; i++ ) {
+        newArray.push_back( iter->next() );
+    }
+
+    std::sort( newArray.begin(), newArray.end() );
+    for( std::size_t i = 0; i < intQueue.size(); i++ ) {
+        CPPUNIT_ASSERT( newArray[i] == intQueue.poll() );
+    }
+
+    const PriorityQueue<int> constQueue( intQueue );
+    CPPUNIT_ASSERT( !constQueue.isEmpty() );
+    CPPUNIT_ASSERT( constQueue.size() == intQueue.size() );
+
+    std::auto_ptr< Iterator<int> > const_iter( constQueue.iterator() );
+    const_iter->next();
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a UnsupportedOperationException",
+        const_iter->remove(),
+        UnsupportedOperationException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PriorityQueueTest::testIteratorRemoveIllegalState() {
+
+    PriorityQueue<int> intQueue;
+    int array[] = {2, 45, 7, -12, 9};
+    for( int i = 0; i < 5; i++ ) {
+        intQueue.offer( array[i] );
+    }
+    std::auto_ptr< Iterator<int> > iter( intQueue.iterator() );
+    CPPUNIT_ASSERT( iter.get() != NULL );
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a IllegalStateException",
+        iter->remove(),
+        IllegalStateException );
+
+    iter->next();
+    iter->remove();
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should Throw a IllegalStateException",
+        iter->remove(),
+        IllegalStateException );
 
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.h?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/PriorityQueueTest.h Thu Aug 27 18:35:53 2009
@@ -28,14 +28,54 @@
 
         CPPUNIT_TEST_SUITE( PriorityQueueTest );
         CPPUNIT_TEST( testConstructor_1 );
+        CPPUNIT_TEST( testConstructor_2 );
+        CPPUNIT_TEST( testConstructor_3 );
+        CPPUNIT_TEST( testAssignment );
+        CPPUNIT_TEST( testSize );
+        CPPUNIT_TEST( testOfferString );
+        CPPUNIT_TEST( testPoll );
+        CPPUNIT_TEST( testPollEmpty );
+        CPPUNIT_TEST( testPeek );
+        CPPUNIT_TEST( testPeekEmpty );
+        CPPUNIT_TEST( testClear );
+        CPPUNIT_TEST( testAdd );
+        CPPUNIT_TEST( testRemove );
+        CPPUNIT_TEST( testRemoveUsingComparator );
+        CPPUNIT_TEST( testRemoveNotExists );
+        CPPUNIT_TEST( testComparator );
+        CPPUNIT_TEST( testIterator );
+        CPPUNIT_TEST( testIteratorEmpty );
+        CPPUNIT_TEST( testIteratorOutOfBounds );
+        CPPUNIT_TEST( testIteratorRemove );
+        CPPUNIT_TEST( testIteratorRemoveIllegalState );
         CPPUNIT_TEST_SUITE_END();
 
     public:
 
-        PriorityQueueTest();
-        virtual ~PriorityQueueTest();
+        PriorityQueueTest() {}
+        virtual ~PriorityQueueTest() {}
 
         void testConstructor_1();
+        void testConstructor_2();
+        void testConstructor_3();
+        void testAssignment();
+        void testSize();
+        void testOfferString();
+        void testPoll();
+        void testPollEmpty();
+        void testPeek();
+        void testPeekEmpty();
+        void testClear();
+        void testAdd();
+        void testRemove();
+        void testRemoveUsingComparator();
+        void testRemoveNotExists();
+        void testComparator();
+        void testIterator();
+        void testIteratorEmpty();
+        void testIteratorOutOfBounds();
+        void testIteratorRemove();
+        void testIteratorRemoveIllegalState();
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=808563&r1=808562&r2=808563&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Thu Aug 27 18:35:53 2009
@@ -260,3 +260,5 @@
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::StringTokenizerTest );
 #include <decaf/util/TimerTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::TimerTest );
+#include <decaf/util/PriorityQueueTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::PriorityQueueTest );



Mime
View raw message