db-derby-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r405037 [1/3] - in /db/derby/code/trunk/java: drda/org/apache/derby/impl/drda/ engine/org/apache/derby/iapi/reference/ testing/org/apache/derbyTesting/functionTests/master/ testing/org/apache/derbyTesting/functionTests/master/DerbyNet/ test...
Date Mon, 08 May 2006 12:36:41 GMT
Author: tmnk
Date: Mon May  8 05:36:39 2006
New Revision: 405037

URL: http://svn.apache.org/viewcvs?rev=405037&view=rev
Log:
- DERBY-326 Improve streaming of large objects for network server and client - Patch by Tomohito
Nakayama (tomonaka@basil.ocn.ne.jp)

Added:
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/ReEncodedInputStream.java   (with
props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/SuicideOfStreaming.out
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/SuicideOfStreaming.out
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/OutBufferedStream.out
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/tests/derbynet/OutBufferedStream.java
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/tests/derbynet/OutBufferedStream_app.properties
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/tests/derbynet/SuicideOfStreaming.java
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/tests/derbynet/SuicideOfStreaming_app.properties
  (with props)
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/tests/derbynet/SwitchablePrintStream.java
  (with props)
Modified:
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMWriter.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java
    db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAInputStream.java
    db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/Property.java
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/blobclob4BLOB.out
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/blobclob4BLOB.out
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/suites/derbynetmats.runall
    db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/tests/derbynet/dataSourcePermissions_net.java

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMWriter.java
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMWriter.java?rev=405037&r1=405036&r2=405037&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMWriter.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DDMWriter.java Mon May  8 05:36:39
2006
@@ -21,12 +21,19 @@
 package org.apache.derby.impl.drda;
 
 import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import org.apache.derby.iapi.services.sanity.SanityManager;
 import java.sql.SQLException;
 import java.sql.DataTruncation;
 import java.math.BigDecimal;
 import org.apache.derby.iapi.error.ExceptionSeverity;
 import java.util.Arrays;
+import org.apache.derby.iapi.reference.Property;
+import org.apache.derby.iapi.services.property.PropertyUtil;
+
+import java.io.IOException;
 
 /**
 	The DDMWriter is used to write DRDA protocol.   The DRDA Protocol is
@@ -662,67 +669,76 @@
 	}
 
 
-	// TODO: Rewrite writeScalarStream to avoid passing a length.
-	// The length is never written and not required by the DRDA spec.
-	// Also looks like on IOException we just pad out the stream instead
-	// of actually sending an exception.  Similar code is in client, so 
-	// should be fixed in both places.
-	protected int  writeScalarStream (boolean chainedWithSameCorrelator,
+    
+    protected void writeScalarStream (boolean chainedWithSameCorrelator,
 									  int codePoint,
-									  int length,
-									  java.io.InputStream in,
+				      EXTDTAInputStream in,
 									  boolean writeNullByte) 
 		throws DRDAProtocolException
 	{
 
+	    
+
 		// Stream equivalent of "beginDss"...
-		int leftToRead = length;
-		int bytesToRead = prepScalarStream (chainedWithSameCorrelator,
+	    int spareDssLength = prepScalarStream( chainedWithSameCorrelator,
 											codePoint,
-											writeNullByte,
-											leftToRead);
-
-		if (length == 0)
-			return 0;
-
+											writeNullByte);
+	    
 		// write the data
 		int bytesRead = 0;
 		int totalBytesRead = 0;
-		do {
-			do {
+
 				try {
-					bytesRead = in.read (bytes, offset, bytesToRead);
-					totalBytesRead += bytesRead;
-				}
-				catch (java.io.IOException e) {
-					padScalarStreamForError (leftToRead, bytesToRead);
-					return totalBytesRead;
-				}
-				if (bytesRead == -1) {
-					padScalarStreamForError (leftToRead, bytesToRead);
-					return totalBytesRead;
+				    
+		OutputStream out = 
+		    placeLayerBStreamingBuffer( agent.getOutputStream() );
+		
+		boolean isLastSegment = false;
+		
+		while( !isLastSegment ){
+		    
+		    int spareBufferLength = bytes.length - offset;
+		    
+		    if( SanityManager.DEBUG ){
+		
+			if( PropertyUtil.getSystemProperty("derby.debug.suicideOfLayerBStreaming") != null )
+			    throw new IOException();
 				}
-				else {
-					bytesToRead -= bytesRead;
+		    
+		    bytesRead = in.read(bytes,
+					offset,
+					Math.min(spareDssLength,
+						 spareBufferLength));
+		    
+					totalBytesRead += bytesRead;
 					offset += bytesRead;
-					leftToRead -= bytesRead;
-				}
-			} while (bytesToRead > 0);
+		    spareDssLength -= bytesRead;
+		    spareBufferLength -= bytesRead;
+
+		    isLastSegment = peekStream(in) < 0;
+		    
+		    if(isLastSegment || 
+		       spareDssLength == 0){
+			
+			flushScalarStreamSegment (isLastSegment, 
+						  out);
+			
+			if( ! isLastSegment )
+			    spareDssLength = DssConstants.MAX_DSS_LENGTH - 2;
 
-			bytesToRead = flushScalarStreamSegment (leftToRead, bytesToRead);
-		} while (leftToRead > 0);
-		// check to make sure that the specified length wasn't too small
-		try {
-			if (in.read() != -1) {
-				totalBytesRead += 1;
 			}
+		    
 		}
-		catch (java.io.IOException e) {
-			// Encountered error in stream length verification for 
-			// InputStream, parameter #" + parameterIndex + ".  
-			// Don't think we need to error for this condition
+		
+		out.flush();
+		
+	    }catch(IOException e){
+		agent.markCommunicationsFailure ("DDMWriter.writeScalarStream()",
+						 "",
+						 e.getMessage(),
+						 "*");
 		}
-		return totalBytesRead;
+				
 	}
 	
 	/**
@@ -749,24 +765,31 @@
 	}
 
 
-  // prepScalarStream does the following prep for writing stream data:
-  // 1.  Flushes an existing DSS segment, if necessary
-  // 2.  Determines if extended length bytes are needed
-  // 3.  Creates a new DSS/DDM header and a null byte indicator, if applicable
-  protected int prepScalarStream  (boolean chainedWithSameCorrelator,
+    /**
+     * prepScalarStream does the following prep for writing stream data:
+     * 1.  Flushes an existing DSS segment, if necessary
+     * 2.  Determines if extended length bytes are needed
+     * 3.  Creates a new DSS/DDM header and a null byte indicator, if applicable
+     *
+     * If value of length was less than 0, this method processes streaming as Layer B Streaming.
+     * cf. page 315 of specification of DRDA, Version 3, Volume 3 
+     *
+     */
+  private int prepScalarStream( boolean chainedWithSameCorrelator,
                                    int codePoint,
-                                   boolean writeNullByte,
-                                   int leftToRead) throws DRDAProtocolException
+                                   boolean writeNullByte) throws DRDAProtocolException
   {
-    int extendedLengthByteCount;
 
-    int nullIndicatorSize = 0;
-    if (writeNullByte) 
-		nullIndicatorSize = 1;
-	extendedLengthByteCount = calculateExtendedLengthByteCount (leftToRead + 4 + nullIndicatorSize);
+      ensureLength( DEFAULT_BUFFER_SIZE - offset );
+      
+      final int nullIndicatorSize = writeNullByte ? 1:0;
+
+    
+      // flush the existing DSS segment ,
+      // if this stream will not fit in the send buffer or 
+      // length of this stream is unknown.
+      // Here, 10 stands for sum of headers of layer A and B.
 
-    // flush the existing DSS segment if this stream will not fit in the send buffer
-    if (10 + extendedLengthByteCount + nullIndicatorSize + leftToRead + offset > DssConstants.MAX_DSS_LENGTH)
{
       try {
 	    // The existing DSS segment was finalized by endDss; all
 	    // we have to do is send it across the wire.
@@ -777,39 +800,22 @@
                                               "OutputStream.flush()",
                                               e.getMessage(),"*");
       }
-    }
 
     // buildStreamDss should not call ensure length.
 	beginDss(chainedWithSameCorrelator, DssConstants.GDSFMT_OBJDSS);
 
-    if (extendedLengthByteCount > 0) {
-      // method should never ensure length
-      writeLengthCodePoint (0x8004 + extendedLengthByteCount, codePoint);
-
-      if (writeNullByte)
-        writeExtendedLengthBytes (extendedLengthByteCount, leftToRead + 1);
-      else
-        writeExtendedLengthBytes (extendedLengthByteCount, leftToRead);
-    }
-    else {
-      if (writeNullByte)
-        writeLengthCodePoint (leftToRead + 4 + 1, codePoint);
-      else
-        writeLengthCodePoint (leftToRead + 4, codePoint);
-    }
+      writeLengthCodePoint(0x8004,codePoint);
+
 
     // write the null byte, if necessary
     if (writeNullByte)
       writeByte(0x0);
 
-    int bytesToRead;
+      //Here, 6 stands for header of layer A and 
+      //4 stands for header of layer B.
+      return DssConstants.MAX_DSS_LENGTH - 6 - 4 - nullIndicatorSize;
 
-    if (writeNullByte)
-      bytesToRead = Math.min (leftToRead, DssConstants.MAX_DSS_LENGTH - 6 - 4 - 1 - extendedLengthByteCount);
-    else
-      bytesToRead = Math.min (leftToRead, DssConstants.MAX_DSS_LENGTH - 6 - 4 - extendedLengthByteCount);
 
-    return bytesToRead;
   }
 
 
@@ -823,69 +829,44 @@
 
 	// Writes out a scalar stream DSS segment, along with DSS continuation
 	// headers if necessary.
-	protected int flushScalarStreamSegment (int leftToRead,
-											int bytesToRead)
+	private void flushScalarStreamSegment ( boolean lastSegment,
+					        OutputStream out)
 		throws DRDAProtocolException
 	{
-		int newBytesToRead = bytesToRead;
 
 		// either at end of data, end of dss segment, or both.
-		if (leftToRead != 0) {
-		// 32k segment filled and not at end of data.
+	    if (! lastSegment) {
 
-			if ((Math.min (2 + leftToRead, 32767)) > (bytes.length - offset)) {
+		// 32k segment filled and not at end of data.
 				try {
 				// Mark current DSS as continued, set its chaining state,
 				// then send the data across.
 					markDssAsContinued(true); 	// true => for lobs
-					sendBytes (agent.getOutputStream());
-				}
-				catch (java.io.IOException ioe) {
+					sendBytes (out,
+						   false);
+			    
+			}catch (java.io.IOException ioe) {
 					agent.markCommunicationsFailure ("DDMWriter.flushScalarStreamSegment()",
                                                "",
                                                ioe.getMessage(),
                                                "*");
 				}
-			}
-			else {
-			// DSS is full, but we still have space in the buffer.  So
-			// end the DSS, then start the next DSS right after it.
-				endDss(false);		// false => don't finalize length.
-			}
+
 
 			// Prepare a DSS continuation header for next DSS.
 			dssLengthLocation = offset;
 			bytes[offset++] = (byte) (0xff);
 			bytes[offset++] = (byte) (0xff);
-			newBytesToRead = Math.min (leftToRead,32765);
 			isContinuationDss = true;
-  		}
-		else {
+	    }else{
 		// we're done writing the data, so end the DSS.
 			endDss();
-		}
-
-		return newBytesToRead;
 
 	}
 
-  // the offset must not be updated when an error is encountered
-  // note valid data may be overwritten
-  protected void padScalarStreamForError (int leftToRead, int bytesToRead) throws DRDAProtocolException
-  {
-    do {
-      do {
-        bytes[offset++] = (byte)(0x0); // use 0x0 as the padding byte
-        bytesToRead--;
-        leftToRead--;
-      } while (bytesToRead > 0);
-
-      bytesToRead = flushScalarStreamSegment (leftToRead, bytesToRead);
-    } while(leftToRead > 0);
   }
 
 
-
 	private void writeExtendedLengthBytes (int extendedLengthByteCount, long length)
 	{
 	int shiftSize = (extendedLengthByteCount -1) * 8;
@@ -1787,14 +1768,25 @@
 
 	}
 
+    
+    private void sendBytes (java.io.OutputStream socketOutputStream) 
+	throws java.io.IOException{
+	
+	sendBytes(socketOutputStream,
+		  true);
+	
+    }
+    
 
-
-  private void sendBytes (java.io.OutputStream socketOutputStream) throws java.io.IOException
+  private void sendBytes (java.io.OutputStream socketOutputStream,
+			  boolean flashStream ) 
+      throws java.io.IOException
   {
 	resetChainState();
     try {
       socketOutputStream.write (bytes, 0, offset);
-      socketOutputStream.flush();
+      if(flashStream)
+	  socketOutputStream.flush();
     }
     finally {
 		if ((dssTrace != null) && dssTrace.isComBufferTraceOn()) {
@@ -1965,5 +1957,36 @@
 
 	}
 
+	
+    private static int peekStream(InputStream in) throws IOException{
+	    
+	in.mark(1);
+
+	try{
+	    return in.read();
+	    
+	}finally{
+	    in.reset();
+	    
+	}
+    }
+
+    
+    private static int getLayerBStreamingBufferSize(){
+	return PropertyUtil.getSystemInt( Property.DRDA_PROP_STREAMOUTBUFFERSIZE , 0 );
+    }
+    
+    
+    private static OutputStream placeLayerBStreamingBuffer(OutputStream original){
+	
+	int size = getLayerBStreamingBufferSize();
+	
+	if(size < 1)
+	    return original;
+	else
+	    return new BufferedOutputStream( original, size );
+
+    }
+    
 }
 

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java?rev=405037&r1=405036&r2=405037&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/DRDAConnThread.java Mon May 
8 05:36:39 2006
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.sql.CallableStatement;
@@ -6958,12 +6959,21 @@
 					break;
 				case DRDAConstants.DRDA_TYPE_NLOBBYTES:
 				case DRDAConstants.DRDA_TYPE_NLOBCMIXED:
+				    
 					// do not send EXTDTA for lob of length 0, beetle 5967
-					valLength = ((EXTDTAInputStream) val).length();
-					if (valLength > 0)
+				    if( ! ((EXTDTAInputStream) val).isEmptyStream() ){
 						stmt.addExtDtaObject(val, index);
-					writer.writeExtendedLength (valLength);
+				    
+					//indicate externalized and size is unknown.
+					writer.writeExtendedLength(0x8000);
+					
+				    }else{
+					writer.writeExtendedLength(0);
+					
+				    }
+				    
 					break;
+				    
 				case  DRDAConstants.DRDA_TYPE_NFIXBYTE:
 					writer.writeBytes((byte[]) val);
 					break;
@@ -7539,29 +7549,17 @@
 		Object o  = extdtaValues.get(i);
         if (o instanceof EXTDTAInputStream) {
 			EXTDTAInputStream stream = (EXTDTAInputStream) o;
-			long lobLength = stream.length();
+			try{
 			writer.writeScalarStream (chainedWithSameCorrelator,
 									  CodePoint.EXTDTA,
-									  (int) Math.min(lobLength,
-													 Integer.MAX_VALUE),
 									  stream,
 									  writeNullByte);
 			
-			try {
+			}finally{
 				// close the stream when done
-				if (stream != null)
-					stream.close();
-			} catch (IOException e) {
-				Util.javaException(e);
-			}
+			    closeStream(stream);
         }
-		else if (o instanceof  byte[]) {
-			byte[] b = (byte []) o;
-			writer.writeScalarStream (chainedWithSameCorrelator,
-									  CodePoint.EXTDTA,
-									  (int) b.length,
-									  new ByteArrayInputStream(b),
-									  writeNullByte);
+			
 		}
 	}
 	// reset extdtaValues after sending
@@ -7659,4 +7657,33 @@
 
 	}
 
+    
+    private static int peekStream(EXTDTAInputStream is) throws IOException{
+	
+	is.mark(1);
+
+	try{
+	    return is.read();
+	    
+	}finally{
+	    is.reset();
+	}
+	
+    }
+    
+    
+    private static void closeStream(InputStream stream){
+	
+	try{
+	    if (stream != null)
+		stream.close();
+	    
+	} catch (IOException e) {
+	    Util.javaException(e);
+	    
+	}
+	
+    }
+    
+    
 }

Modified: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAInputStream.java
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAInputStream.java?rev=405037&r1=405036&r2=405037&view=diff
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAInputStream.java (original)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/EXTDTAInputStream.java Mon May
 8 05:36:39 2006
@@ -22,9 +22,14 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.BufferedInputStream;
 import java.sql.ResultSet;
+import java.sql.Blob;
+import java.sql.Clob;
 import java.sql.SQLException;
 
+import java.io.UnsupportedEncodingException;
+
 import org.apache.derby.iapi.reference.DRDAConstants;
 import org.apache.derby.iapi.services.sanity.SanityManager;
 import org.apache.derby.impl.jdbc.Util;
@@ -42,28 +47,35 @@
  */
 class EXTDTAInputStream extends InputStream {
 
-	long dataLength = 0; // length of the stream;
-
-	InputStream binaryInputStream = null;
+    private InputStream binaryInputStream = null;
 
-	int columnNumber;
-
-	ResultSet dataResultSet = null;
+    private boolean isEmptyStream;
 	
+    private ResultSet dataResultSet = null;
+    private Blob blob = null;
+    private Clob clob = null;
 	
 	/**
-	 * @param dataLength
 	 * @param binaryInputStream
 	 */
-	private EXTDTAInputStream( int dataLength, InputStream binaryInputStream) {
+	private EXTDTAInputStream(ResultSet rs,
+				  int columnNumber,
+				  int ndrdaType) 
+	    throws SQLException, IOException
+    {
+	
+	    this.dataResultSet = rs;
+	    this.isEmptyStream = ! initInputStream(rs,
+						   columnNumber,
+						   ndrdaType);
 		
-		this.dataLength = dataLength;
-		this.binaryInputStream = binaryInputStream;
 	}
 
+    
+    
 	/**
 	 * Retrieve stream from the ResultSet and column specified.  Create an
-	 * input stream and length for the large object being retrieved. Do not hold
+	 * input stream for the large object being retrieved. Do not hold
 	 * locks until end of transaction. DERBY-255.
 	 * 
 	 * 
@@ -87,52 +99,17 @@
 	 */
 	public static EXTDTAInputStream getEXTDTAStream(ResultSet rs, int column, int drdaType)

 			throws SQLException {
-		
-		EXTDTAInputStream extdtaStream = null;
-		int length = 0;
-		byte[] bytes = null;
-		
+ 	    try{
 		int ndrdaType = drdaType | 1; //nullable drdaType
-		// BLOBS
-		if (ndrdaType == DRDAConstants.DRDA_TYPE_NLOBBYTES) 
-		{
-			//TODO: Change to just use rs.getBinaryStream() by 
-			// eliminating the need for a length parameter in
-			//DDMWriter.writeScalarStream and therefore eliminating the need for dataLength in this
class
-			bytes = rs.getBytes(column);
 			
-		}
-		// CLOBS
-		else if (ndrdaType ==  DRDAConstants.DRDA_TYPE_NLOBCMIXED)
-		{	
-			//TODO: Change to use getCharacterStream and change the read method
-			// to stream the data after length is no longer needed in DDMWRiter.writeScalarStream
-			String s  = rs.getString(column);
-			try {
-				if (s != null)
-					bytes = s.getBytes(NetworkServerControlImpl.DEFAULT_ENCODING);
-			}
-			catch (java.io.UnsupportedEncodingException e) {
-				throw new SQLException (e.getMessage());
-			}
-		}
-		else
-		{
-			if (SanityManager.DEBUG)
-			{
-			SanityManager.THROWASSERT("DRDAType: " + drdaType +
-						" not valid EXTDTA object type");
-			}
-		}
+		return new EXTDTAInputStream(rs,
+					     column,
+					     ndrdaType);
 		
-		if (bytes != null)
-		{
-			length = bytes.length;
-			InputStream is = new ByteArrayInputStream(bytes);
-			extdtaStream =  new EXTDTAInputStream(length, is);
+ 	    }catch(IOException e){
+ 		throw new SQLException(e.getMessage());
 		}
 		
-		return extdtaStream;
 	}
 
 	
@@ -170,18 +147,6 @@
 	}
 	
 	
-	
-	/**
-	 * Return the length of the binary stream which was calculated when
-	 * EXTDTAObject was created.
-	 * 
-	 * @return the length of the stream once converted to an InputStream
-	 */
-	public long length() throws SQLException {
-		return dataLength;
-		
-	}
-
 	/**
 	 * 
 	 * 
@@ -206,8 +171,19 @@
 	 * @see java.io.InputStream#close()
 	 */
 	public void close() throws IOException {
+	    
+	    try{
 		if (binaryInputStream != null)
 			binaryInputStream.close();	
+		binaryInputStream = null;
+
+	    }finally{
+		
+		blob = null;
+		clob = null;
+		dataResultSet = null;
+	    }
+	    
 	}
 
 	/**
@@ -280,6 +256,142 @@
 	 */
 	public long skip(long arg0) throws IOException {
 		return binaryInputStream.skip(arg0);
+	}
+
+
+    protected boolean isEmptyStream(){
+	return isEmptyStream;
+    }
+    
+    
+    /**
+     * This method takes information of ResultSet and 
+     * initialize binaryInputStream variable of this object with not empty stream and return
true.
+     * If the stream was empty, this method remain binaryInputStream null and return false.
+     *
+     * @param rs        ResultSet object to get stream from.
+     * @param column    index number of column in ResultSet to get stream.
+     * @param ndrdaType describe type column to get stream.
+     *
+     * @return          true if the stream was not empty, false if the stream was empty.
+     *
+     */
+    private boolean initInputStream(ResultSet rs,
+				    int column,
+				    int ndrdaType)
+	throws SQLException,
+	       IOException
+    {
+
+	InputStream is = null;
+	try{
+	    // BLOBS
+	    if (ndrdaType == DRDAConstants.DRDA_TYPE_NLOBBYTES) 
+		{
+		    blob = rs.getBlob(column);
+		    if(blob == null){
+			return false;
+		    }
+		    
+		    is = blob.getBinaryStream();
+		    
+		}
+	    // CLOBS
+	    else if (ndrdaType ==  DRDAConstants.DRDA_TYPE_NLOBCMIXED)
+		{	
+		    try {
+			clob = rs.getClob(column);
+			
+			if(clob == null){
+			    return false;
+			}
+
+			is = new ReEncodedInputStream(clob.getCharacterStream());
+			
+		    }catch (java.io.UnsupportedEncodingException e) {
+			throw new SQLException (e.getMessage());
+			
+		    }catch (IOException e){
+			throw new SQLException (e.getMessage());
+			
+		    }
+		    
+		}
+	    else
+		{
+		    if (SanityManager.DEBUG)
+			{
+			    SanityManager.THROWASSERT("NDRDAType: " + ndrdaType +
+						      " not valid EXTDTA object type");
+			}
+		}
+	    
+	    boolean exist = is.read() > -1;
+	    
+	    is.close();
+	    is = null;
+	    
+	    if(exist){
+		openInputStreamAgain();
+	    }
+
+	    return exist;
+	    
+	}catch(IllegalStateException e){
+	    throw Util.javaException(e);
+
+	}finally{
+	    if(is != null)
+		is.close();
+	    
+	}
+	
+    }
+    
+    
+    /**
+     *
+     * This method is called from initInputStream and 
+     * opens inputstream again to stream actually.
+     *
+     */
+    private void openInputStreamAgain() throws IllegalStateException,SQLException {
+	
+	if(this.binaryInputStream != null){
+	    return;
+	}
+		
+	InputStream is = null;
+	try{
+	    
+	    if(SanityManager.DEBUG){
+		SanityManager.ASSERT( ( blob != null && clob == null ) ||
+				      ( clob != null && blob == null ),
+				      "One of blob or clob must be non-null.");
+	    }
+
+	    if(blob != null){
+		is = blob.getBinaryStream();
+		
+	    }else if(clob != null){
+		is = new ReEncodedInputStream(clob.getCharacterStream());
+	    }
+	    
+	}catch(IOException e){
+	    throw new IllegalStateException(e.getMessage());
+	}
+	
+	if(! is.markSupported() ){
+	    is = new BufferedInputStream(is);
+	}
+
+	this.binaryInputStream = is;
+
+    }
+    
+    
+    protected void finalize() throws Throwable{
+	close();
 	}
 
 

Added: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/ReEncodedInputStream.java
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/ReEncodedInputStream.java?rev=405037&view=auto
==============================================================================
--- db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/ReEncodedInputStream.java (added)
+++ db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/ReEncodedInputStream.java Mon
May  8 05:36:39 2006
@@ -0,0 +1,170 @@
+/*
+ 
+Derby - Class org.apache.derby.impl.drda.ReEncodedInputStream
+
+Copyright 2002, 2004 The Apache Software Foundation or its licensors, as applicable.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+*/
+package org.apache.derby.impl.drda;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.OutputStreamWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.derby.iapi.services.sanity.SanityManager;
+
+/**
+ *
+ * ReEncodedInputStream passes
+ * stream from Reader, which is stream of decoded style, 
+ * to user of this subclass of InputStream, which is stream of encoded style.
+ *
+ * The encoding of stream passed to user is limited to UTF8.
+ *
+ * This class will be used to pass stream, which is served as a Reader,
+ * as a InputStream of a arbitrary encoding.
+ *
+ */
+public class ReEncodedInputStream extends InputStream {
+
+    private static final int BUFFERED_CHAR_LEN = 1024;
+
+    private Reader reader_;
+    private char[] decodedBuffer_;
+    
+    private OutputStreamWriter encodedStreamWriter_;
+    private PublicBufferOutputStream encodedOutputStream_;
+    
+    private ByteArrayInputStream encodedInputStream_;
+    
+    public ReEncodedInputStream(Reader reader) 
+	throws IOException {
+	
+	reader_ = reader;
+	decodedBuffer_ = new char[BUFFERED_CHAR_LEN];
+
+	encodedOutputStream_ = new PublicBufferOutputStream( BUFFERED_CHAR_LEN * 3 );
+	encodedStreamWriter_ = new OutputStreamWriter(encodedOutputStream_,"UTF8");
+	
+	encodedInputStream_ = reEncode(reader_);
+	
+    }
+
+
+    private ByteArrayInputStream reEncode(Reader reader) 
+	throws IOException
+    {
+	
+	int count;
+	if(( count = reader.read(decodedBuffer_, 0, BUFFERED_CHAR_LEN )) < 1 ){
+	    return null;
+	}
+	
+	encodedOutputStream_.reset();
+	encodedStreamWriter_.write(decodedBuffer_,0,count);
+	encodedStreamWriter_.flush();
+
+	int encodedLength = encodedOutputStream_.size();
+	
+	return new ByteArrayInputStream(encodedOutputStream_.getBuffer(),
+					0,
+					encodedLength);
+    }
+    
+    
+    public int available() 
+	throws IOException {
+	
+	if(encodedInputStream_ == null){
+	    return 0;
+	}
+
+	return encodedInputStream_.available();
+	
+    }
+    
+
+    public void close() 
+	throws IOException {
+	
+	if(encodedInputStream_ != null ){
+	    encodedInputStream_.close();
+	    encodedInputStream_ = null;
+	}
+
+	if(reader_ != null ){
+	    reader_.close();
+	    reader_ = null;
+	}
+
+	if(encodedStreamWriter_ != null){
+	    encodedStreamWriter_.close();
+	    encodedStreamWriter_ = null;
+	}
+	
+    }
+    
+    
+    public int read() 
+	throws IOException {
+	
+	if(encodedInputStream_ == null){
+	    return -1;
+	}
+	
+	int c = encodedInputStream_.read();
+
+	if(c > -1){
+	    return c;
+	    
+	}else{
+	    encodedInputStream_ = reEncode(reader_);
+	    
+	    if(encodedInputStream_ == null){
+		return -1;
+	    }
+	    
+	    return encodedInputStream_.read();
+
+	}
+	
+    }
+    
+    
+    protected void finalize() throws IOException {
+	close();
+    }
+    
+    
+    static class PublicBufferOutputStream extends ByteArrayOutputStream{
+	
+	PublicBufferOutputStream(int size){
+	    super(size);
+	}
+
+	public byte[] getBuffer(){
+	    return buf;
+	}
+	
+    }
+
+}
+
+

Propchange: db/derby/code/trunk/java/drda/org/apache/derby/impl/drda/ReEncodedInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/Property.java
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/Property.java?rev=405037&r1=405036&r2=405037&view=diff
==============================================================================
--- db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/Property.java (original)
+++ db/derby/code/trunk/java/engine/org/apache/derby/iapi/reference/Property.java Mon May
 8 05:36:39 2006
@@ -805,6 +805,14 @@
 	 */
 	public final static String DRDA_PROP_KEEPALIVE = "derby.drda.keepAlive";
 	
+
+    /**
+     * derby.drda.streamOutBufferSize
+     * size of buffer used when stream out for client.
+     *
+     */
+    public final static String DRDA_PROP_STREAMOUTBUFFERSIZE = "derby.drda.streamOutBufferSize";
+
 	/*
 	** Internal properties, mainly used by Monitor.
 	*/

Added: db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/SuicideOfStreaming.out
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/SuicideOfStreaming.out?rev=405037&view=auto
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/SuicideOfStreaming.out
(added)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/SuicideOfStreaming.out
Mon May  8 05:36:39 2006
@@ -0,0 +1 @@
+com.ibm.db2.jcc.c.DisconnectException: A communication error has been detected. Communication
protocol being used: Reply.fill(). Communication API being used: InputStream.read(). Location
where the error was detected: insufficient data. Communication function detecting the error:
*. Protocol specific error codes(s) TCP/IP SOCKETS 

Propchange: db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/SuicideOfStreaming.out
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/blobclob4BLOB.out
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/blobclob4BLOB.out?rev=405037&r1=405036&r2=405037&view=diff
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/blobclob4BLOB.out
(original)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNet/blobclob4BLOB.out
Mon May  8 05:36:39 2006
@@ -508,7 +508,8 @@
 row 9 is null, skipped
 clobTest91 finished
 START: clobTest92
-clobTest92 finished
+FAIL -- unexpected exception ****************
+SQLSTATE(40XL1): A lock could not be obtained within the time requested
 START: clobTest93
 clobTest92 finished
 START: clobTest94

Added: db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/SuicideOfStreaming.out
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/SuicideOfStreaming.out?rev=405037&view=auto
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/SuicideOfStreaming.out
(added)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/SuicideOfStreaming.out
Mon May  8 05:36:39 2006
@@ -0,0 +1,3 @@
+java.sql.SQLException: Insufficient data while reading from the network - expected a minimum
of 6 bytes and received only -1 bytes.  The connection has been terminated.
+Caused by: org.apache.derby.client.am.DisconnectException: Insufficient data while reading
from the network - expected a minimum of 6 bytes and received only -1 bytes.  The connection
has been terminated.
+	... 2 more

Propchange: db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/SuicideOfStreaming.out
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/blobclob4BLOB.out
URL: http://svn.apache.org/viewcvs/db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/blobclob4BLOB.out?rev=405037&r1=405036&r2=405037&view=diff
==============================================================================
--- db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/blobclob4BLOB.out
(original)
+++ db/derby/code/trunk/java/testing/org/apache/derbyTesting/functionTests/master/DerbyNetClient/blobclob4BLOB.out
Mon May  8 05:36:39 2006
@@ -508,7 +508,8 @@
 row 9 is null, skipped
 clobTest91 finished
 START: clobTest92
-clobTest92 finished
+FAIL -- unexpected exception ****************
+SQLSTATE(40XL1): A lock could not be obtained within the time requested
 START: clobTest93
 clobTest92 finished
 START: clobTest94



Mime
View raw message