activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r474558 - in /incubator/activemq/activemq-cpp/trunk/activemq-cpp: ./ src/main/activemq/connector/stomp/ src/main/activemq/io/ src/main/activemq/network/ src/main/activemq/transport/ src/test/activemq/io/ src/test/activemq/network/
Date Mon, 13 Nov 2006 23:08:52 GMT
Author: nmittler
Date: Mon Nov 13 15:08:51 2006
New Revision: 474558

URL: http://svn.apache.org/viewvc?view=rev&rev=474558
Log:
Refactored the socket reading code to used blocking sockets

Modified:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/configure.ac
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedInputStreamTest.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedOutputStreamTest.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/ByteArrayInputStreamTest.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketFactoryTest.h
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketTest.h

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/configure.ac?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/configure.ac (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/configure.ac Mon Nov 13 15:08:51 2006
@@ -67,6 +67,9 @@
 AC_CHECK_HEADERS([uuid/uuid.h])
 AC_CHECK_HEADERS([objbase.h])
 AC_CHECK_HEADERS([repcdce.h])
+AC_CHECK_HEADERS([sys/filio.h])
+AC_CHECK_HEADERS([sys/ioctl.h])
+AC_CHECK_HEADERS([sys/select.h])
 
 AM_PATH_CPPUNIT(1.10.2, cppunit=yes, cppunit=no; AC_MSG_RESULT([no. Unit and Integration tests disabled])) 
 AM_CONDITIONAL(BUILD_CPPUNIT_TESTS, test x$cppunit = xyes)
@@ -93,6 +96,7 @@
     
   *) ## Unix configuration
 
+    AC_CHECK_FUNCS([ioctl select])
     AC_CHECK_LIB(pthread, pthread_create,[have_pthread="yes"],
                 AC_MSG_ERROR([libpthread not found!]))
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp Mon Nov 13 15:08:51 2006
@@ -18,6 +18,7 @@
 
 #include <activemq/connector/stomp/commands/CommandConstants.h>
 #include <activemq/concurrent/Thread.h>
+#include <activemq/util/Character.h>
 
 using namespace std;
 using namespace activemq;
@@ -27,6 +28,7 @@
 using namespace activemq::transport;
 using namespace activemq::io;
 using namespace activemq::exceptions;
+using namespace activemq::util;
 
 ////////////////////////////////////////////////////////////////////////////////
 StompCommandReader::StompCommandReader(void)
@@ -49,8 +51,8 @@
         // Create a new Frame for reading to.
         StompFrame* frame = new StompFrame();
        
-        // Read the command into the frame.
-        readStompCommand( *frame );
+        // Read the command header.
+        readStompCommandHeader( *frame );
        
         // Read the headers.
         readStompHeaders( *frame );
@@ -67,39 +69,24 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void StompCommandReader::readStompCommand( StompFrame& frame ) 
+void StompCommandReader::readStompCommandHeader( StompFrame& frame ) 
    throw ( StompConnectorException )
 {  
     while( true ) 
     {
-        // Clean up the mess.
-        buffer.clear();
-
-        // Read the command;
+        // The command header is formatted
+        // just like any other stomp header.
         readStompHeaderLine();
 
         // Ignore all white space before the command.
-        int offset=-1;
+        int offset = -1;
         for( size_t ix = 0; ix < buffer.size()-1; ++ix )
         {
-            // Find the first non space character
-            char b = buffer[ix];
-            switch ( b ) 
-            {
-                case '\n':
-                case '\t':
-                case '\r':
-                    break;
-                  
-                default:
-                    offset = ix;
-                    break; 
-            } 
-            
-            if( offset != -1 )
-            {
+            // Find the first non whitespace character
+            if( !Character::isWhitespace(buffer[ix]) ){
+                offset = ix;
                 break;
-            }            
+            }
         }
     
         if( offset >= 0 )
@@ -110,8 +97,6 @@
         }
     
     }
-    // Clean up the mess.
-    buffer.clear();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -122,10 +107,7 @@
     bool endOfHeaders = false;
 
     while( !endOfHeaders )
-    {
-        // Clean up the mess.
-        buffer.clear();
-
+    {        
         // Read in the next header line.
         int numChars = readStompHeaderLine();
 
@@ -166,15 +148,15 @@
             }
         }
     }
-
-    // Clean up the mess.
-    buffer.clear();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int StompCommandReader::readStompHeaderLine(void) 
     throw ( StompConnectorException )
 {
+    // Clear any data from the buffer.
+    buffer.clear();
+        
     int count = 0;
   
     while( true )
@@ -207,6 +189,9 @@
 void StompCommandReader::readStompBody( StompFrame& frame ) 
    throw ( StompConnectorException )
 {
+    // Clear any data from the buffer.
+    buffer.clear();
+    
     unsigned long content_length = 0;
    
     if(frame.getProperties().hasProperty(
@@ -287,10 +272,7 @@
 
         // Set the body contents in the frame - copy the memory
         frame.setBody( cpyBody, content_length );
-    }
-
-    // Clean up the mess.
-    buffer.clear();
+    }    
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -304,24 +286,8 @@
             "StompCommandReader::read(char*,int) - input stream is NULL" );
     }
    
-    int head = 0;
-   
-    // We call the read(buffer, size) version asking for one
-    // byte, if this returns zero, then there wasn't anything 
-    // on the stream to read, so we try again after a short 
-    // pause in hopes that some more data will show up.
-    while( true )
-    {
-        head += inputStream->read( &buffer[head], count - head );
-      
-        if( head == count )
-        {
-            return count;
-        }
-      
-        // Got here, so we wait a bit and try again.
-        Thread::sleep( 10 );
-    }
+    // Just delegate to the input stream.
+    return inputStream->read( buffer, count );
 }
  
 ////////////////////////////////////////////////////////////////////////////////
@@ -334,7 +300,5 @@
             "StompCommandReader::read(char*,int) - input stream is NULL" );
     }
    
-    unsigned char c = 0;
-    inputStream->read( &c, 1 );
-    return c;
+    return inputStream->read();
 }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h Mon Nov 13 15:08:51 2006
@@ -111,7 +111,7 @@
          * @param reference to a Stomp Frame
          * @throws StompConnectorException
          */
-        void readStompCommand( StompFrame& frame ) 
+        void readStompCommandHeader( StompFrame& frame ) 
             throw ( StompConnectorException );
 
         /** 
@@ -134,7 +134,7 @@
          * @param Stomp Frame to place data in
          */
         void readStompBody( StompFrame& frame ) 
-            throw ( StompConnectorException );
+            throw ( StompConnectorException );                
     
     };
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp Mon Nov 13 15:08:51 2006
@@ -59,9 +59,9 @@
 StompSessionManager::~StompSessionManager(void)
 {
     // NOTE - I am not cleaning out the ConsumerInfo objects in the
-    // map becaise it is really the job of the consumer ot remove itself
+    // map because it is really the job of the consumer to remove itself
     // when it is destructed.  If it doesn't then we would have problems,
-    // but if it does, but it's deleted after this object then we would
+    // but it's deleted after this object then we would
     // still have problems.  
 }
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp Mon Nov 13 15:08:51 2006
@@ -22,17 +22,20 @@
 using namespace std;
 
 ////////////////////////////////////////////////////////////////////////////////
-BufferedInputStream::BufferedInputStream( InputStream* stream )
+BufferedInputStream::BufferedInputStream( InputStream* stream, bool own )
+: FilterInputStream( stream, own )
 {
     // Default to a 1k buffer.
-    init( stream, 1024 );
+    init( 1024 );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 BufferedInputStream::BufferedInputStream( InputStream* stream, 
-    const int bufferSize )
+    unsigned int bufferSize,
+    bool own  )
+: FilterInputStream( stream, own )
 {
-    init( stream, bufferSize );
+    init( bufferSize );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -46,9 +49,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void BufferedInputStream::init( InputStream* stream, const int bufferSize ){
+void BufferedInputStream::init( unsigned int bufferSize ){
     
-    this->stream = stream;
     this->bufferSize = bufferSize;
     
     // Create the buffer and initialize the head and tail positions.
@@ -58,67 +60,61 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void BufferedInputStream::close() throw( cms::CMSException ){
-    
-    // Close the delegate stream.
-    stream->close();
-}
-
-////////////////////////////////////////////////////////////////////////////////
 unsigned char BufferedInputStream::read() throw ( IOException ){
-    
-    // If we don't have any data buffered yet - read as much as we can. 
-    if( tail == head ){
+
+    // If there's no data left, reset to pointers to the beginning of the
+    // buffer.
+    normalizeBuffer();                
+
+    // If we don't have any data buffered yet - read as much as 
+    // we can. 
+    if( isEmpty() ){
         bufferData();
     }
     
     // Get the next character.
     char returnValue = buffer[head++];
     
-    // If the buffer is now empty - reset it to the beginning of the buffer.
-    if( tail == head ){
-        tail = head = 0;
-    }
-    
     return returnValue;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::read( unsigned char* buffer, 
-    const int bufferSize ) throw ( IOException ){
+int BufferedInputStream::read( unsigned char* targetBuffer, 
+    const int targetBufferSize ) throw ( IOException ){
+    
+    // If there's no data left, reset to pointers to the beginning of the
+    // buffer.
+    normalizeBuffer();
     
     // If we still haven't filled the output buffer AND there is data
     // on the input stream to be read, read a buffer's
     // worth from the stream.
     int totalRead = 0;
-    while( totalRead < bufferSize ){        
+    while( totalRead < targetBufferSize ){        
         
         // Get the remaining bytes to copy.
-        int bytesToCopy = min( tail-head, (bufferSize-totalRead) );
+        int bytesToCopy = min( tail-head, (targetBufferSize-totalRead) );
         
         // Copy the data to the output buffer.  
-        memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
+        memcpy( targetBuffer+totalRead, this->buffer+head, bytesToCopy );
         
         // Increment the total bytes read.
         totalRead += bytesToCopy;
         
-        // Increment the head position.  If the buffer is now empty,
-        // reset the positions and buffer more data.
+        // Increment the head position.
         head += bytesToCopy;
-        if( head == tail ){
-            
-            // Reset the buffer indicies.
-            head = tail = 0;
-            
-            // If there is no more data currently available on the 
-            // input stream, stop the loop.
-            if( stream->available() == 0 ){
-                break;
-            }
+        
+        // If the buffer is now empty, reset the positions to the
+        // head of the buffer.
+        normalizeBuffer();
+        
+        // If we still haven't satisified the request, 
+        // read more data.
+        if( totalRead < targetBufferSize ){                  
             
             // Buffer as much data as we can.
             bufferData();
-        }               
+        }              
     }
     
     // Return the total number of bytes read.
@@ -128,18 +124,26 @@
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedInputStream::bufferData() throw ( IOException ){
     
-    if( tail == bufferSize ){
+    if( getUnusedBytes() == 0 ){
         throw IOException( __FILE__, __LINE__, 
             "BufferedInputStream::bufferData - buffer full" );
     }
     
-    // Read in as many bytes as we can.
-    int bytesRead = stream->read( buffer+tail, bufferSize-tail );
+    // Get the number of bytes currently available on the input stream
+    // that could be read without blocking.
+    int available = inputStream->available();
+    
+    // Calculate the number of bytes that we can read.  Always >= 1 byte!
+    int bytesToRead = max( 1, min( available, getUnusedBytes() ) );
+    
+    // Read the bytes from the input stream.
+    int bytesRead = inputStream->read( getTail(), bytesToRead );
     if( bytesRead == 0 ){
         throw IOException( __FILE__, __LINE__, 
             "BufferedInputStream::read() - failed reading bytes from stream");
     }
     
     // Increment the tail to the new end position.
-    tail += bytesRead;
+    tail += bytesRead;   
 }
+

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h Mon Nov 13 15:08:51 2006
@@ -18,7 +18,7 @@
 #ifndef ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
 #define ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
  
-#include <activemq/io/InputStream.h>
+#include <activemq/io/FilterInputStream.h>
 #include <assert.h>
 
 namespace activemq{
@@ -30,14 +30,9 @@
      * in order to reduce the number of io operations on the
      * input stream.
      */
-    class BufferedInputStream : public InputStream
+    class BufferedInputStream : public FilterInputStream
     {
     private:
-   
-        /**
-         * The target input stream.
-         */
-        InputStream* stream;
       
         /**
          * The internal buffer.
@@ -47,7 +42,7 @@
         /**
          * The buffer size.
          */
-        int bufferSize;
+        unsigned int bufferSize;
       
         /**
          * The current head of the buffer.
@@ -64,83 +59,21 @@
         /**
          * Constructor
          * @param stream The target input stream.
+         * @param own indicates if we own the stream object, defaults to false
          */
-        BufferedInputStream( InputStream* stream );
+        BufferedInputStream( InputStream* stream, bool own = false );
       
         /**
          * Constructor
          * @param stream the target input stream
          * @param bufferSize the size for the internal buffer.
+         * @param own indicates if we own the stream object, defaults to false.
          */
-        BufferedInputStream( InputStream* stream, const int bufferSize );
+        BufferedInputStream( InputStream* stream, 
+                             unsigned int bufferSize, 
+                             bool own = false);
       
         virtual ~BufferedInputStream();
-      
-        /**
-         * Locks the object.
-         * throws ActiveMQException
-         */
-        virtual void lock() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->lock();
-        }
-   
-        /**
-         * Unlocks the object.
-         * throws ActiveMQException
-         */
-        virtual void unlock() throw( exceptions::ActiveMQException ){   
-           assert( stream != NULL );
-           stream->unlock();
-        }
-       
-        /**
-         * Waits on a signal from this object, which is generated
-         * by a call to Notify.  Must have this object locked before
-         * calling.
-         * throws ActiveMQException
-         */
-        virtual void wait() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->wait();
-        }
-    
-        /**
-         * Waits on a signal from this object, which is generated
-         * by a call to Notify.  Must have this object locked before
-         * calling.  This wait will timeout after the specified time
-         * interval.
-         * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
-         * @throws ActiveMQException
-         */
-        virtual void wait( unsigned long millisecs ) 
-            throw( exceptions::ActiveMQException ) {
-         
-            assert( stream != NULL );
-            stream->wait(millisecs);
-        }
-
-        /**
-         * Signals a waiter on this object that it can now wake
-         * up and continue.  Must have this object locked before
-         * calling.
-         * throws ActiveMQException
-         */
-        virtual void notify() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->notify();
-        }
-        
-        /**
-         * Signals the waiters on this object that it can now wake
-         * up and continue.  Must have this object locked before
-         * calling.
-         * throws ActiveMQException
-         */
-        virtual void notifyAll() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->notifyAll();
-        }
     
         /**
          * Indcates the number of bytes avaialable.
@@ -148,19 +81,21 @@
          * in the buffer and the data available on the target
          * input stream.
          */
-        virtual int available() const{   
-            return ( tail - head ) + stream->available();
+        virtual int available() const throw ( IOException ) {   
+            return ( tail - head ) + inputStream->available();
         }
             
         /**
-         * Reads a single byte from the buffer.
+         * Reads a single byte from the buffer.  Blocks until
+         * the data is available.
          * @return The next byte.
          * @throws IOException thrown if an error occurs.
          */
         virtual unsigned char read() throw ( IOException );
       
         /**
-         * Reads an array of bytes from the buffer.
+         * Reads an array of bytes from the buffer.  Blocks
+         * until the requested number of bytes are available.
          * @param buffer (out) the target buffer.
          * @param bufferSize the size of the output buffer.
          * @return The number of bytes read.
@@ -169,20 +104,13 @@
         virtual int read( unsigned char* buffer, const int bufferSize ) 
             throw ( IOException );
       
-        /**
-         * Closes the target input stream.
-         * @throws CMSException
-         */
-        virtual void close(void) throw( cms::CMSException );
-      
     private:
    
         /**
          * Initializes the internal structures.
-         * @param stream to read from
          * @param size of buffer to allocate
          */
-        void init( InputStream* stream, const int bufferSize );
+        void init( unsigned int bufferSize );
       
         /**
          * Populates the buffer with as much data as possible
@@ -190,6 +118,45 @@
          * @throws CMSException
          */
         void bufferData(void) throw ( IOException );
+        
+        /**
+         * Returns the number of bytes that are currently unused
+         * in the buffer.
+         */
+        int getUnusedBytes() const{
+            return bufferSize - tail;
+        }
+        
+        /**
+         * Returns the current tail position of the buffer.
+         */
+        unsigned char* getTail(){
+            return buffer + tail;
+        }
+        
+        /**
+         * Initializes the head and tail indicies to the beginning
+         * of the buffer.
+         */
+        void clear(){
+            head = tail = 0;
+        }
+        
+        /**
+         * Inidicates whether or not the buffer is empty.
+         */
+        bool isEmpty() const{
+            return head == tail;
+        }
+        
+        /**
+         * Clears the buffer if there is no data remaining.
+         */
+        void normalizeBuffer(){
+            if( isEmpty() ){
+                clear();
+            }
+        }
 
     };
    

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp Mon Nov 13 15:08:51 2006
@@ -22,17 +22,20 @@
 using namespace std;
 
 ////////////////////////////////////////////////////////////////////////////////
-BufferedOutputStream::BufferedOutputStream( OutputStream* stream )
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream, bool own )
+: FilterOutputStream( stream, own )
 {
     // Default to 1k buffer.
-    init( stream, 1024 );
+    init( 1024 );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 BufferedOutputStream::BufferedOutputStream( OutputStream* stream, 
-    const int bufSize )
+    unsigned int bufSize,
+    bool own )
+: FilterOutputStream( stream, own )
 {
-    init( stream, bufSize );
+    init( bufSize );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -46,9 +49,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void BufferedOutputStream::init( OutputStream* stream, const int bufSize ){
+void BufferedOutputStream::init( unsigned int bufSize ){
     
-    this->stream = stream;
     this->bufferSize = bufSize;
     
     buffer = new unsigned char[bufSize];
@@ -62,14 +64,14 @@
     flush();    
     
     // Close the delegate stream.
-    stream->close();
+    outputStream->close();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedOutputStream::emptyBuffer() throw ( IOException ){
     
     if( head != tail ){
-        stream->write( buffer+head, tail-head );
+        outputStream->write( buffer+head, tail-head );
     }
     head = tail = 0;
 }
@@ -81,13 +83,13 @@
     emptyBuffer();
     
     // Flush the output stream.
-    stream->flush();
+    outputStream->flush();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedOutputStream::write( const unsigned char c ) throw ( IOException ){
     
-    if( tail >= bufferSize ){
+    if( tail >= (int)bufferSize ){
         emptyBuffer();
     }
     
@@ -101,12 +103,12 @@
     // Iterate until all the data is written.
     for( int pos=0; pos < len; ){
         
-        if( tail >= bufferSize ){
+        if( tail >= (int)bufferSize ){
             emptyBuffer();
         }
     
         // Get the number of bytes left to write.
-        int bytesToWrite = min( bufferSize-tail, len-pos );
+        int bytesToWrite = min( (int)bufferSize-tail, len-pos );
         
         // Copy the data.
         memcpy( this->buffer+tail, buffer+pos, bytesToWrite );

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h Mon Nov 13 15:08:51 2006
@@ -18,7 +18,7 @@
 #ifndef ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
 #define ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
  
-#include <activemq/io/OutputStream.h>
+#include <activemq/io/FilterOutputStream.h>
 #include <assert.h>
 
 namespace activemq{
@@ -28,14 +28,9 @@
      * Wrapper around another output stream that buffers
      * output before writing to the target output stream.
      */
-    class BufferedOutputStream : public OutputStream
+    class BufferedOutputStream : public FilterOutputStream
     {
     private:
-   
-        /**
-         * The target output stream.
-         */
-        OutputStream* stream;
       
         /**
          * The internal buffer.
@@ -45,7 +40,7 @@
         /**
          * The size of the internal buffer.
          */
-        int bufferSize;
+        unsigned int bufferSize;
       
         /**
          * The current head of the buffer.
@@ -63,82 +58,18 @@
          * Constructor.
          * @param stream the target output stream.
          */
-        BufferedOutputStream( OutputStream* stream );
+        BufferedOutputStream( OutputStream* stream, bool own = false );
       
         /**
          * Constructor
          * @param stream the target output stream.
          * @param bufSize the size for the internal buffer.
          */
-        BufferedOutputStream( OutputStream* stream, const int bufSize );
+        BufferedOutputStream( OutputStream* stream, 
+                              unsigned int bufSize, 
+                              bool own = false);
       
         virtual ~BufferedOutputStream();
-      
-        /**
-         * Locks the object.
-         * @throws ActiveMQException
-         */
-        virtual void lock() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->lock();
-        }
-   
-        /**
-         * Unlocks the object.
-         * @throws ActiveMQException
-         */
-        virtual void unlock() throw( exceptions::ActiveMQException ){   
-            assert( stream != NULL );
-            stream->unlock();
-        }
-       
-        /**
-         * Waits on a signal from this object, which is generated
-         * by a call to Notify.  Must have this object locked before
-         * calling.
-         * @throws ActiveMQException
-         */
-        virtual void wait() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->wait();
-        }
-    
-        /**
-         * Waits on a signal from this object, which is generated
-         * by a call to Notify.  Must have this object locked before
-         * calling.  This wait will timeout after the specified time
-         * interval.
-         * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
-         * @throws ActiveMQException
-         */
-        virtual void wait( unsigned long millisecs ) 
-            throw( exceptions::ActiveMQException ) {
-         
-            assert( stream != NULL );
-            stream->wait( millisecs );
-        }
-
-        /**
-         * Signals a waiter on this object that it can now wake
-         * up and continue.  Must have this object locked before
-         * calling.
-         * @throws ActiveMQException
-         */
-        virtual void notify() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->notify();
-        }
-        
-        /**
-         * Signals the waiters on this object that it can now wake
-         * up and continue.  Must have this object locked before
-         * calling.
-         * @throws ActiveMQException
-         */
-        virtual void notifyAll() throw( exceptions::ActiveMQException ){
-            assert( stream != NULL );
-            stream->notifyAll();
-        }
        
         /**
          * Writes a single byte to the output stream.
@@ -172,7 +103,7 @@
         /**
          * Initializes the internal structures.
          */
-        void init( OutputStream* stream, const int bufSize );
+        void init( unsigned int bufSize );
       
         /**
          * Writes the contents of the buffer to the output stream.

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp Mon Nov 13 15:08:51 2006
@@ -25,12 +25,14 @@
 ByteArrayInputStream::ByteArrayInputStream()
 {
     pos = buffer.end();
+    closing = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ByteArrayInputStream::ByteArrayInputStream( const unsigned char* buffer,
                                             int bufferSize )
 {
+    closing = false;
     setByteArray( buffer, bufferSize );
 }
 
@@ -40,39 +42,63 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ByteArrayInputStream::setByteArray( const unsigned char* buffer,
-                                         int bufferSize )
+void ByteArrayInputStream::setByteArray( const unsigned char* lbuffer,
+                                         int lbufferSize )
 {
-    // Remove old data
-    this->buffer.clear();
-   
-    // Copy data to internal buffer.
-    for( int ix = 0; ix < bufferSize; ++ix )
-    {
-        this->buffer.push_back(buffer[ix]);
+    synchronized( this ){
+        
+        // Remove old data
+        this->buffer.clear();
+       
+        // Copy data to internal buffer.
+        for( int ix = 0; ix < lbufferSize; ++ix )
+        {
+            this->buffer.push_back(lbuffer[ix]);
+        }
+       
+        // Begin at the Beginning.
+        pos = this->buffer.begin();
+        
+        // Notify any listening threads that there
+        // is now data available.
+        notifyAll();
     }
-   
-    // Begin at the Beginning.
-    pos = this->buffer.begin();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ByteArrayInputStream::close() throw( cms::CMSException ){
     
-    // Close the delegate stream.
-    buffer.clear();
+    synchronized( this ){
+     
+        // Indicate that this stream is closing.
+        closing = true;
+           
+        // Close the delegate stream.
+        buffer.clear();
+        
+        // Notify that this stream is shutting down.
+        notifyAll();
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 unsigned char ByteArrayInputStream::read() throw ( IOException )
 {
-    if(pos != buffer.end())
-    {
-        return *(pos++);
+    synchronized( this ){
+        
+        while( !closing ){
+            
+            if(pos != buffer.end())
+            {
+                return *(pos++);
+            }                        
+            
+            // Wait for data to come in.
+            wait();
+        }
+        
+        throw IOException( __FILE__, __LINE__, "close occurred during a read" );
     }
-   
-    throw IOException( __FILE__, __LINE__, 
-        "ByteArrayInputStream::read: Out of Data");
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -80,17 +106,26 @@
                                 const int bufferSize ) 
                                    throw ( IOException )
 {
-     int ix = 0;
-   
-     for( ; ix < bufferSize; ++ix, ++pos)
-     {
-        if(pos == this->buffer.end())
-        {        
-            break;
+    synchronized( this ){
+        
+        int ix = 0;
+        
+        for( ; ix < bufferSize && !closing; ++ix, ++pos)
+        {
+            if(pos == this->buffer.end())
+            {   
+                // We don't have the requested data yet -
+                // wait for it.     
+                wait();
+            }
+          
+            buffer[ix] = *(pos);
+        }
+        
+        if( closing ){
+            throw IOException( __FILE__, __LINE__, "close occurred during a read" );
         }
-      
-        buffer[ix] = *(pos);
+       
+        return ix;
     }
-   
-    return ix;
 }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.h Mon Nov 13 15:08:51 2006
@@ -44,6 +44,12 @@
          * Synchronization object.
          */
         concurrent::Mutex mutex;
+        
+        /**
+         * Indicates that this stream is in the process
+         * of shutting down.
+         */
+        bool closing;
       
     public:
    
@@ -138,7 +144,7 @@
          * in the buffer and the data available on the target
          * input stream.
          */
-        virtual int available() const{   
+        virtual int available() const throw (IOException) {   
             return std::distance( pos, buffer.end() );
         }
             

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h Mon Nov 13 15:08:51 2006
@@ -41,21 +41,24 @@
         /**
          * Indcates the number of bytes avaialable.
          * @return the number of bytes available on this input stream.
+         * @throws IOException if an error occurs.
          */
-        virtual int available() const = 0;
+        virtual int available() const throw ( IOException ) = 0;
         
         /**
-         * Reads a single byte from the buffer.
+         * Reads a single byte from the buffer.  Blocks until
+         * data is available.
          * @return The next byte.
          * @throws IOException thrown if an error occurs.
          */
         virtual unsigned char read() throw ( IOException ) = 0;
         
         /**
-         * Reads an array of bytes from the buffer.
+         * Reads an array of bytes from the buffer.  Blocks until
+         * the requested number of bytes are available.
          * @param buffer (out) the target buffer.
          * @param bufferSize the size of the output buffer.
-         * @return The number of bytes read or -1 if EOS is detected
+         * @return The number of bytes read or -1 if EOF is detected
          * @throws IOException thrown if an error occurs.
          */
         virtual int read( unsigned char* buffer, const int bufferSize ) 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketFactory.cpp Mon Nov 13 15:08:51 2006
@@ -86,11 +86,6 @@
         dummy = properties.getProperty( "soSendBufferSize", "2000000" );  
         sscanf( dummy.c_str(), "%d", &soSendBufferSize );
       
-        // Get the socket send buffer size.
-        int soTimeout = 10000;
-        dummy = properties.getProperty( "soTimeout", "10000" );  
-        sscanf( dummy.c_str(), "%d", &soTimeout );
-      
         // Now that we have all the elements that we wanted - let's do it!
         // Create a TCP Socket and then Wrap it in a buffered socket
         // so that users get the benefit of buffered reads and writes.
@@ -108,7 +103,6 @@
         socket->setKeepAlive( soKeepAlive );
         socket->setReceiveBufferSize( soReceiveBufferSize );
         socket->setSendBufferSize( soSendBufferSize );
-        socket->setSoTimeout( soTimeout );
 
         return socket;
     }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp Mon Nov 13 15:08:51 2006
@@ -18,16 +18,28 @@
 #include <activemq/util/Config.h>
 
 #if !defined(HAVE_WINSOCK2_H) 
-    #include <sys/poll.h>
+    //#include <sys/poll.h>
+    #include <sys/select.h>
     #include <sys/socket.h>
-    #include <errno.h>
+    #include <errno.h>    
     extern int errno;
 #else
     #include <Winsock2.h>
 #endif
 
+#ifdef HAVE_SYS_IOCTL_H
+#define BSD_COMP /* Get FIONREAD on Solaris2. */
+#include <sys/ioctl.h>
+#endif
+
+// Pick up FIONREAD on Solaris 2.5.
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
 #include <activemq/network/SocketInputStream.h>
 #include <activemq/io/IOException.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
 #include <stdlib.h>
 #include <string>
 #include <stdio.h>
@@ -35,6 +47,7 @@
 using namespace activemq;
 using namespace activemq::network;
 using namespace activemq::io;
+using namespace activemq::exceptions;
 using namespace std;
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -50,9 +63,59 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int SocketInputStream::available() const{
-   
-#if !defined(HAVE_WINSOCK2_H) 
+int SocketInputStream::available() const throw (activemq::io::IOException){
+
+// The windows version
+#if defined(HAVE_WINSOCK2_H) 
+
+    unsigned long numBytes = 0;
+
+    if (::ioctlsocket (socket, FIONREAD, &numBytes) == SOCKET_ERROR){
+        throw SocketException( __FILE__, __LINE__, "ioctlsocket failed" );
+    }
+
+    return (int)numBytes;
+
+#else // !defined(HAVE_WINSOCK2_H)
+
+    // If FIONREAD is defined - use ioctl to find out how many bytes
+    // are available.
+	#if defined(FIONREAD)
+
+        int numBytes = 0;
+	    if( ::ioctl (socket, FIONREAD, &numBytes) != -1 ){
+            return numBytes;
+        }
+         
+	#endif
+	
+	// If we didn't get anything we can use select.  This is a little
+    // less functional.  We will poll on the socket - if there is data
+    // available, we'll return 1, otherwise we'll return zero.
+	#if defined(HAVE_SELECT)    
+
+        fd_set rd;
+        FD_ZERO(&rd);
+        FD_SET( socket, &rd );
+        struct timeval tv;
+        tv.tv_sec = 0;
+        tv.tv_usec = 0;
+        int returnCode = ::select(socket+1, &rd, NULL, NULL, &tv);
+        if(returnCode == -1){
+            throw IOException(__FILE__, __LINE__, ::strerror(errno));
+        }
+        return (returnCode == 0)? 0 : 1;
+        
+    #else
+    
+        return 0;
+        
+	#endif /* HAVE_SELECT */
+	
+
+#endif // !defined(HAVE_WINSOCK2_H)
+    
+/*#if !defined(HAVE_WINSOCK2_H) 
     
     // Poll the socket for input.
     pollfd fd;
@@ -81,7 +144,7 @@
    
 #endif
 
-    return 0;
+    return 0;*/
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -100,7 +163,59 @@
 ////////////////////////////////////////////////////////////////////////////////
 int SocketInputStream::read( unsigned char* buffer, const int bufferSize ) throw (IOException){
    
-    int bytesAvailable = available();
+    int len = ::recv(socket, (char*)buffer, bufferSize, 0);
+    
+    // Check for a closed socket.
+    if( len == 0 ){
+        throw IOException( __FILE__, __LINE__, 
+            "activemq::io::SocketInputStream::read - The connection is broken" );
+    }
+    
+    // Check for error.
+    if( len == -1 ){
+        
+        #if !defined(HAVE_WINSOCK2_H) 
+         
+            // Create the error string.
+            char* errorString = ::strerror(errno);
+         
+        #else
+        
+            // If the socket was temporarily unavailable - just try again.
+            int errorCode = ::WSAGetLastError();
+      
+            // Create the error string.
+            static const int errorStringSize = 512;
+            char errorString[errorStringSize];
+            memset( errorString, 0, errorStringSize );
+            FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+               0,
+               errorCode,
+               0,
+               errorString,
+               errorStringSize - 1,
+               NULL);
+              
+        #endif
+        
+        // Otherwise, this was a bad error - throw an exception.
+        throw IOException( __FILE__, __LINE__, 
+                "activemq::io::SocketInputStream::read - %s", errorString );
+    }
+    
+    if( debug ){
+        printf("SocketInputStream:read(), numbytes:%d -", len);
+        for( int ix=0; ix<len; ++ix ){
+            if( buffer[ix] > 20 )
+                printf("%c", buffer[ix] );
+            else
+                printf("[%d]", buffer[ix] );
+        }
+        printf("\n");
+    }
+    
+    return len;
+    /*int bytesAvailable = available();
    
     while( true )
     {
@@ -173,5 +288,5 @@
         
         // Data was read successfully - return the bytes read.
         return len;
-    }
+    }*/
 }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h Mon Nov 13 15:08:51 2006
@@ -26,7 +26,8 @@
 namespace network{
     
     /**
-     * Input stream for performing reads on a socket.
+     * Input stream for performing reads on a socket.  This
+     * class will only work properly for blocking sockets.
      */
 	class SocketInputStream : public io::InputStream
 	{
@@ -119,21 +120,24 @@
         }
 	    
 	    /**
-	     * Polls instantaneously to see if data is available on 
-	     * the socket.
-	     * @return 1 if data is currently available on the socket, otherwise 0.
+	     * Returns the number of bytes available on the socket to
+         * be read right now.
+	     * @return The number of bytes currently available to
+         * be read on the socket.
 	     */
-		virtual int available() const;
+		virtual int available() const throw (activemq::io::IOException);
 		
 		/**
-		 * Reads a single byte from the buffer.
+		 * Reads a single byte from the buffer.  If no data
+         * is available, blocks until their is.
 		 * @return The next byte.
 		 * @throws IOException thrown if an error occurs.
 		 */
 		virtual unsigned char read() throw ( io::IOException );
 		
 		/**
-		 * Reads an array of bytes from the buffer.
+		 * Reads an array of bytes from the buffer.  If no data
+         * is available, blocks until there is.
 		 * @param buffer (out) the target buffer.
 		 * @param bufferSize the size of the output buffer.
 		 * @return The number of bytes read.

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp Mon Nov 13 15:08:51 2006
@@ -201,7 +201,7 @@
    
     if( isConnected() )
     {
-        ::shutdown( socketHandle, 2 );
+        ::shutdown( socketHandle, SHUT_RDWR );
         
 		#if !defined(HAVE_WINSOCK2_H)
             ::close( socketHandle );

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Mon Nov 13 15:08:51 2006
@@ -114,33 +114,29 @@
         // Mark this transport as closed.
         closed = true;
         
+        // We have to close the input stream before
+        // we stop the thread.  this will force us to
+        // wake up the thread if it's stuck in a read
+        // (which is likely).  Otherwise, the join that
+        // follows will block forever.
+        if( inputStream != NULL ){
+            
+            inputStream->close();
+            inputStream = NULL;
+        }
+        
         // Wait for the thread to die.
         if( thread != NULL ){
             thread->join();
             delete thread;
             thread = NULL;
-        }
-        
-        /**
-         * Close the input stream.
-         */
-        if( inputStream != NULL ){
-            
-            synchronized( inputStream ){
-                inputStream->close();
-                inputStream = NULL;
-            }
-        }
+        }        
         
-        /**
-         * Close the output stream.
-         */
+        // Close the output stream.
         if( outputStream != NULL ){
             
-            synchronized( outputStream ){
-                outputStream->close();
-                outputStream = NULL;
-            }
+            outputStream->close();
+            outputStream = NULL;
         }
     }
     AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
@@ -153,29 +149,12 @@
    try{
         
       while( !closed ){
-        
-         int available = 0;            
-         synchronized( inputStream ){
-            available = inputStream->available();
-         }
-
-         if( available > 0 ){
-                
-             Command* command = NULL;
-
-             synchronized( inputStream ){
-                 // Read the next command from the input stream.
-                 command = reader->readCommand();
-             }
-                                
-             // Notify the listener.
-             fire( command );
-         }
-         else{
-                
-             // Sleep for a short time and try again.
-             Thread::sleep( 1 );
-         }        
+                 
+         // Read the next command from the input stream.
+         Command* command = reader->readCommand();
+                            
+         // Notify the listener.
+         fire( command );      
       }
         
    }catch( exceptions::ActiveMQException& ex ){

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedInputStreamTest.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedInputStreamTest.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedInputStreamTest.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedInputStreamTest.h Mon Nov 13 15:08:51 2006
@@ -48,7 +48,7 @@
 			}
 			virtual ~MyInputStream(){}
 			
-			virtual int available() const{
+			virtual int available() const throw (IOException){
 				int len = data.length();
 				return len - (int)pos;
 			}
@@ -98,7 +98,7 @@
 			
 			std::string testStr = "TEST12345678910";
 			MyInputStream myStream( testStr );
-			BufferedInputStream bufStream( &myStream, 1 );
+			BufferedInputStream bufStream( &myStream, (unsigned int)1 );
 			
 			int available = bufStream.available();
 			CPPUNIT_ASSERT( available == (int)testStr.length() );
@@ -138,7 +138,7 @@
 			
 			std::string testStr = "TEST12345678910";
 			MyInputStream myStream( testStr );
-			BufferedInputStream bufStream( &myStream, 10 );
+			BufferedInputStream bufStream( &myStream, (unsigned int)10 );
 			
 			int available = bufStream.available();
 			CPPUNIT_ASSERT( available == (int)testStr.length() );

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedOutputStreamTest.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedOutputStreamTest.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedOutputStreamTest.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/BufferedOutputStreamTest.h Mon Nov 13 15:08:51 2006
@@ -97,7 +97,7 @@
 		void testSmallerBuffer(){
 			
 			MyOutputStream myStream;
-			BufferedOutputStream bufStream( &myStream, 1 );
+			BufferedOutputStream bufStream( &myStream, (unsigned int)1 );
 			
 			const char* buffer = myStream.getBuffer();
 			
@@ -120,7 +120,7 @@
 		void testBiggerBuffer(){
 			
 			MyOutputStream myStream;
-			BufferedOutputStream bufStream( &myStream, 10 );
+			BufferedOutputStream bufStream( &myStream, (unsigned int)10 );
 			
 			const char* buffer = myStream.getBuffer();
 			

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/ByteArrayInputStreamTest.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/ByteArrayInputStreamTest.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/ByteArrayInputStreamTest.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/io/ByteArrayInputStreamTest.h Mon Nov 13 15:08:51 2006
@@ -56,6 +56,7 @@
          char c = stream_a.read();
          char d = stream_a.read();
 
+         printf("a=%c, b=%c, c=%c, d=%c\n", a, b, c, d );
          CPPUNIT_ASSERT( a == 't' && b == 'e' && c == 's' && d == 't' );
          CPPUNIT_ASSERT( stream_a.available() == 0 );
 
@@ -78,8 +79,10 @@
          memset(buffer, 0, 6);
 
          CPPUNIT_ASSERT( stream_a.read(buffer, 3) == 3 );
-         CPPUNIT_ASSERT( stream_a.read(&buffer[3], 5) == 2 );
+         CPPUNIT_ASSERT( stream_a.read(&buffer[3], 2) == 2 );
          CPPUNIT_ASSERT( std::string((const char*)buffer) == std::string("teste") );
+         
+         stream_a.close();
          
          delete buffer;
       }

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketFactoryTest.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketFactoryTest.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketFactoryTest.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketFactoryTest.h Mon Nov 13 15:08:51 2006
@@ -86,7 +86,7 @@
                network::Socket* socket = server.accept();
                server.close();
                
-               socket->setSoTimeout( 10 );
+               //socket->setSoTimeout( 10 );
                socket->setSoLinger( false );
 
                synchronized(&mutex)
@@ -99,25 +99,19 @@
                while( !done && socket != NULL ){
                                     
                   io::InputStream* stream = socket->getInputStream();
-                  if( stream->available() > 0 ){
+                  memset( buf, 0, 1000 );
+                  try{
+                      stream->read( buf, 1000 );
+                    
+                      lastMessage = (char*)buf;
+                    
+                      if( strcmp( (char*)buf, "reply" ) == 0 ){
+                          io::OutputStream* output = socket->getOutputStream();
+                          output->write( (unsigned char*)"hello", strlen("hello" ) );
+                      }
                      
-                     memset( buf, 0, 1000 );
-                     try{
-                        stream->read( buf, 1000 );
-                        
-                        lastMessage = (char*)buf;
-                        
-                        if( strcmp( (char*)buf, "reply" ) == 0 ){
-                           io::OutputStream* output = socket->getOutputStream();
-                           output->write( (unsigned char*)"hello", strlen("hello" ) );
-                        }
-                        
-                     }catch( io::IOException& ex ){
-                        done = true;
-                     }                                      
-                     
-                  }else{
-                     Thread::sleep( 10 );
+                  }catch( io::IOException& ex ){
+                      done = true;
                   }
                }
                
@@ -132,7 +126,7 @@
                }
                
             }catch( io::IOException& ex ){
-               printf("%s\n", ex.getMessage() );
+               printf("%s\n", ex.getMessage().c_str() );
                CPPUNIT_ASSERT( false );
             }catch( ... ){
                CPPUNIT_ASSERT( false );
@@ -163,7 +157,7 @@
             
             properties.setProperty("uri", ostream.str());
             properties.setProperty("soLinger", "false");
-            properties.setProperty("soTimeout", "5");
+            //properties.setProperty("soTimeout", "5");
 
             Socket* client = SocketFactory::createSocket(properties);
 

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketTest.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketTest.h?view=diff&rev=474558&r1=474557&r2=474558
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketTest.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/network/SocketTest.h Mon Nov 13 15:08:51 2006
@@ -88,7 +88,7 @@
 					Socket* socket = server.accept();
 					server.close();
 					
-					socket->setSoTimeout( 10 );
+					//socket->setSoTimeout( 10 );
 					socket->setSoLinger( false );
 					numClients++;
                
@@ -100,30 +100,25 @@
 					while( !done && socket != NULL ){
 												
 						io::InputStream* stream = socket->getInputStream();
-						if( stream->available() > 0 ){
 							
-							memset( buf, 0, 1000 );
-							try{
-								stream->read( buf, 1000 );
-								
-								lastMessage = (char*)buf;
-								
-								if( strcmp( (char*)buf, "reply" ) == 0 ){
-									io::OutputStream* output = socket->getOutputStream();
-									output->write( (unsigned char*)"hello", strlen("hello" ) );
-
-                                      synchronized(&mutex)
-                                      {
-                                         mutex.notifyAll();
-                                      }
-								}
-								
-							}catch( io::IOException& ex ){
-								done = true;
-							}													
+						memset( buf, 0, 1000 );
+						try{
+							stream->read( buf, 1000 );
+							
+							lastMessage = (char*)buf;
+							
+							if( strcmp( (char*)buf, "reply" ) == 0 ){
+								io::OutputStream* output = socket->getOutputStream();
+								output->write( (unsigned char*)"hello", strlen("hello" ) );
+
+                                  synchronized(&mutex)
+                                  {
+                                     mutex.notifyAll();
+                                  }
+							}
 							
-						}else{
-							Thread::sleep( 10 );
+						}catch( io::IOException& ex ){
+							done = true;
 						}
 					}
 					
@@ -164,7 +159,7 @@
 				TcpSocket client;				
 				
 				client.connect("127.0.0.1", port);
-				client.setSoTimeout( 5 );
+				//client.setSoTimeout( 5 );
 				client.setSoLinger( false );
 				
                 synchronized(&serverThread.mutex)
@@ -209,7 +204,7 @@
 				TcpSocket client;				
 				
 				client.connect("127.0.0.1", port);
-				client.setSoTimeout( 5 );
+				//client.setSoTimeout( 5 );
 				client.setSoLinger( false );
 								
                 synchronized(&serverThread.mutex)
@@ -263,7 +258,7 @@
 				TcpSocket client;				
 				
 				client.connect("127.0.0.1", port);
-				client.setSoTimeout( 5 );
+				//client.setSoTimeout( 5 );
 				client.setSoLinger(false);
 				
                 synchronized(&serverThread.mutex)



Mime
View raw message