activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r396329 - in /incubator/activemq/trunk/cms/activemqcms/src/activemq: io/BufferedInputStream.cpp io/BufferedOutputStream.cpp io/BufferedOutputStream.h io/SocketStream.cpp transport/stomp/StompIO.cpp
Date Sun, 23 Apr 2006 22:08:22 GMT
Author: nmittler
Date: Sun Apr 23 15:08:21 2006
New Revision: 396329

URL: http://svn.apache.org/viewcvs?rev=396329&view=rev
Log:
Fixed handling of bytes message for Jira issue AMQ-685

Modified:
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp

Modified: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp (original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp Sun Apr
23 15:08:21 2006
@@ -86,41 +86,39 @@
 ////////////////////////////////////////////////////////////////////////////////
 int BufferedInputStream::read( char* buffer, 
 	const int bufferSize ) throw (ActiveMQException){
-		
-	int totalRead = 0;
-	
-	// Get the number of bytes that can be copied directly from
-	// the buffer.
-	int bytesToCopy = min( tail-head, bufferSize );
-	
-	// Copy the data to the output buffer.	
-	memcpy( buffer, this->buffer+head, bytesToCopy );
-	
-	// Increment the total bytes read.
-	totalRead += bytesToCopy;
 	
-	// Increment the head position.  If the buffer is now empty,
-	// reset the positions.
-	head += bytesToCopy;
-	if( head == tail ){
-		head = tail = 0;
-	}
-	
-	// If we still haven't filled the output buffer, read a buffer's
+	// If we still haven't filled the output buffer AND there is data
+	// on the input stream to be read, read a buffer's
 	// worth from the stream.
-	if( bytesToCopy < bufferSize ){
-		
-		// Buffer as much data as we can.
-		bufferData();
+	int totalRead = 0;
+	while( totalRead < bufferSize ){		
 		
 		// Get the remaining bytes to copy.
-		bytesToCopy = min( tail-head, (bufferSize-bytesToCopy) );
+		int bytesToCopy = min( tail-head, (bufferSize-totalRead) );
 		
 		// Copy the data to the output buffer.	
 		memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
 		
 		// Increment the total bytes read.
 		totalRead += bytesToCopy;
+		
+		// Increment the head position.  If the buffer is now empty,
+		// reset the positions and buffer more data.
+		head += bytesToCopy;
+		if( head == tail ){
+			
+			// Reset the buffer indicies.
+			head = tail = 0;
+			
+			// If there is no more data currently available on the 
+			// input stream, stop the loop.
+			if( stream->available() == 0 ){
+				break;
+			}
+			
+			// Buffer as much data as we can.
+			bufferData();
+		}				
 	}
 	
 	// Return the total number of bytes read.

Modified: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp (original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp Sun
Apr 23 15:08:21 2006
@@ -66,21 +66,29 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void BufferedOutputStream::flush() throw (ActiveMQException){
+void BufferedOutputStream::emptyBuffer() throw (ActiveMQException){
 	
 	if( head != tail ){
 		stream->write( buffer+head, tail-head );
 	}
 	head = tail = 0;
+}
+		
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::flush() throw (ActiveMQException){
 	
+	// Empty the contents of the buffer to the output stream.
+	emptyBuffer();
+	
+	// Flush the output stream.
 	stream->flush();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
 	
-	if( tail == bufferSize-1 ){
-		flush();
+	if( tail >= bufferSize ){
+		emptyBuffer();
 	}
 	
 	buffer[tail++] = c;	
@@ -89,13 +97,14 @@
 ////////////////////////////////////////////////////////////////////////////////		
 void BufferedOutputStream::write( const char* buffer, const int len ) 
 	throw (ActiveMQException)
-{
-	
-	int pos = 0;
-	
+{		
 	// Iterate until all the data is written.
-	while( pos < len ){
+	for( int pos=0; pos < len; ){
 		
+		if( tail >= bufferSize ){
+			emptyBuffer();
+		}
+	
 		// Get the number of bytes left to write.
 		int bytesToWrite = min( bufferSize-tail, len-pos );
 		
@@ -106,12 +115,7 @@
 		tail += bytesToWrite;
 		
 		// Decrease the number of bytes to write.
-		pos += bytesToWrite;
-		
-		// If we don't have enough space in the buffer, flush it.
-		if( bytesToWrite < len || tail >= bufferSize ){
-			flush();
-		}		
+		pos += bytesToWrite;	
 	}	
 }
 

Modified: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h (original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h Sun Apr
23 15:08:21 2006
@@ -42,6 +42,11 @@
 	
 		void init( OutputStream* stream, const int bufSize );
 		
+		/**
+       	 * Writes the contents of the buffer to the output stream.
+       	 */
+      	void emptyBuffer() throw (ActiveMQException);
+        
 	private:
 	
 		OutputStream* stream;

Modified: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp (original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp Sun Apr 23 15:08:21
2006
@@ -74,30 +74,84 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 int SocketStream::read( char* buffer, const int bufferSize ) throw (ActiveMQException){
-	
-	int len = recv( socket->getHandle(), buffer, bufferSize, 0 );
-	if( len < 0 ){
-        socket->close();
-		char buf[500];
-		strerror_r( errno, buf, 500 );
-		throw IOException( string("stomp::io::SocketStream::read(char*,int) - ") + buf );
-	}
-	
-    /*printf("SocketStream:read():");
-    for( int ix=0; ix<len; ++ix ){
-        if( buffer[ix] > 20 )
-            printf("%c", buffer[ix] );
-        else
-            printf("[%d]", buffer[ix] );
-    }
-    printf("\n");*/
     
-	return len;
+    int bytesAvailable = available();
+    
+    while( true ){
+        
+        int len = ::recv(socket->getHandle(), (char*)buffer, bufferSize, 0);
+        
+        // Check for typical error conditions.
+        if( len < 0 ){
+                        
+            #if defined(unix) && !defined(__CYGWIN__)
+            
+                // If the socket was temporarily unavailable - just try again.
+                if( errno == EAGAIN ){
+                    continue;
+                }
+                
+                // Create the error string.
+                char* errorString = ::strerror(errno);
+                
+            #else
+            
+                // If the socket was temporarily unavailable - just try again.
+                int errorCode = ::WSAGetLastError();
+                if( errorCode == WSAEWOULDBLOCK ){
+                    continue;
+                }
+                
+                // Create the error string.
+                static const int errorStringSize = 512;
+                char errorString[errorStringSize];
+                memset( errorString, 0, errorStringSize );
+                FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+                  0,
+                  errorCode,
+                  0,
+                  errorString,
+                  errorStringSize - 1,
+                  NULL);
+                  
+            #endif
+            
+            // Otherwise, this was a bad error - throw an exception.
+            throw IOException( string("stomp::io::SocketStream::write(char) - ") + errorString
);
+        }
+        
+        // No error, but no data - check for a broken socket.
+        if( len == 0 ){
+            
+            // If the poll showed data, but we failed to read any,
+            // the socket is broken.
+            if( bytesAvailable > 0 ){
+                throw IOException( "activemq::io::SocketInputStream::read - The connection
is broken" );
+            }
+            
+            // Socket is not broken, just had no data.
+            return 0;
+        }
+        
+        #ifdef SOCKET_IO_DEBUG
+            printf("SocketStream:read(), numbytes:%d -", len);
+            for( int ix=0; ix<len; ++ix ){
+                if( buffer[ix] > 20 )
+                    printf("%c", buffer[ix] );
+                else
+                    printf("[%d]", buffer[ix] );
+            }
+            printf("\n");
+        #endif
+    
+        // Data was read successfully - return the bytes read.
+        return len;
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void SocketStream::write( const char c ) throw (ActiveMQException){
-	
+	    
 	/*if( c > 20 ){
 		printf("%c", c );
 	}
@@ -109,29 +163,41 @@
 		char buf[500];
 		strerror_r( errno, buf, 500 );
 		throw IOException( string("stomp::io::SocketStream::write(char) - ") + buf );
-	}
+	}    
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void SocketStream::write( const char* buffer, const int len ) 
 	throw (ActiveMQException)
 {
-	/*for( int ix=0; ix<len; ++ix ){
-		char c = buffer[ix];
-		if( c > 20 ){
-			printf("%c", c );
-		}
-		else printf("[%d]", c );
-	}*/
+    #ifdef SOCKET_IO_DEBUG
+        printf("SocketStream:write(), numbytes:%d -", len);
+    	for( int ix=0; ix<len; ++ix ){
+    		char c = buffer[ix];
+    		if( c > 20 ){
+    			printf("%c", c );
+    		}
+    		else printf("[%d]", c );
+    	}
+        printf("\n" );
+    #endif
 	
 	int remaining = len;
 	while( remaining > 0 ) {
       	
-      	int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL );    
 	
+        int flags = 0;
+        #if defined(OSX)
+            flags = SO_NOSIGPIPE;
+        #elif defined( unix )
+            flags = MSG_NOSIGNAL;
+        #endif
+        
+      	int length = send( socket->getHandle(), buffer, remaining, flags );      	
       	if( length < 0 ){
             socket->close();
       		char buf[500];
 			strerror_r( errno, buf, 500 );
+            printf("exception in write\n" );
 			throw IOException( string("stomp::io::SocketStream::write(char*,int) - ") + buf );
       	}
       	

Modified: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp (original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp Sun
Apr 23 15:08:21 2006
@@ -63,6 +63,22 @@
 ////////////////////////////////////////////////////////////////////////////////
 int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException){
 	
+    int content_length = 0;
+   
+   // Check for the content-length header.  This is optional - if not provided
+   // we stop when we encounter a \0\n.
+   const StompFrame::HeaderInfo* headerInfo = frame.getHeaderInfo(StompFrame::HEADER_CONTENTLENGTH);
+   if( headerInfo != NULL )
+   {
+      const char* lengthProperty = headerInfo->value;
+      char* stopped_string = NULL;
+
+      content_length = strtoul(
+         lengthProperty, 
+         &stopped_string, 
+         10);
+   }
+   
 	int pos = 0;
 	
 	while( pos < bufLen ){
@@ -72,12 +88,12 @@
 		
   		// Increment the position pointer.
   		pos++;
-  		
-  		// If we've reached the end of the body - return.
-  		if( (buf[pos-1]=='\0' && pos==1) ||
-  			(pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){	 				  	
	
-  			return pos;
-  		}
+        
+        // Are we at the end of the frame?  The end frame pattern is \0\n
+        bool foundFrameEndPattern = (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1]
== '\n'); 
+        if( (pos > content_length) && foundFrameEndPattern ){                
            
+            return pos;
+        }
 	}
 	
 	// Reading is not complete.
@@ -220,6 +236,7 @@
     	write( body, frame.getBodyLength() );
    	}
    	write( '\0' );
+   	write( '\n' );
    	
    	// Flush the stream.
    	flush();



Mime
View raw message