cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1383455 - /cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
Date Tue, 11 Sep 2012 15:44:11 GMT
Author: dkulp
Date: Tue Sep 11 15:44:11 2012
New Revision: 1383455

URL: http://svn.apache.org/viewvc?rev=1383455&view=rev
Log:
Some optimizations in the SharedInputBuffer to allow reduce the number of times the main CXF
worker threads needs to wait for content as well as remove buffer copies if they do need to
wait.

Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java?rev=1383455&r1=1383454&r2=1383455&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
(original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
Tue Sep 11 15:44:11 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.http.as
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -48,16 +49,20 @@ public class SharedInputBuffer extends E
 
     private final ReentrantLock lock;
     private final Condition condition;
+    private final int requestInputSize;
 
     private volatile IOControl ioctrl;
     private volatile boolean shutdown;
     private volatile boolean endOfStream;
-
+    
+    private volatile ByteBuffer waitingBuffer;
+    
     public SharedInputBuffer(int buffersize, 
                              final ByteBufferAllocator allocator) {
         super(buffersize, allocator);
         this.lock = new ReentrantLock();
         this.condition = this.lock.newCondition();
+        this.requestInputSize = buffersize * 4 / 3;  
     }
 
     public void reset() {
@@ -87,6 +92,11 @@ public class SharedInputBuffer extends E
             setInputMode();
             int totalRead = 0;
             int bytesRead;
+            if (waitingBuffer != null && this.buffer.position() == 0) {
+                while ((bytesRead = decoder.read(this.waitingBuffer)) > 0) {
+                    totalRead += bytesRead;
+                }
+            }
             while ((bytesRead = decoder.read(this.buffer)) > 0) {
                 totalRead += bytesRead;
             }
@@ -156,7 +166,8 @@ public class SharedInputBuffer extends E
         this.lock.lock();
         try {
             try {
-                while (!super.hasData() && !this.endOfStream) {
+                while ((this.waitingBuffer != null && this.waitingBuffer.position()
== 0) 
+                    && !super.hasData() && !this.endOfStream) {
                     if (this.shutdown) {
                         throw new InterruptedIOException("Input operation aborted");
                     }
@@ -235,7 +246,13 @@ public class SharedInputBuffer extends E
         this.lock.lock();
         try {
             if (!hasData()) {
+                this.waitingBuffer = ByteBuffer.wrap(b, off, len);
                 waitForData();
+                int i = waitingBuffer.position();
+                waitingBuffer = null;
+                if (i > 0) {
+                    return i;
+                }
             }
             if (isEndOfStream()) {
                 return -1;
@@ -246,6 +263,12 @@ public class SharedInputBuffer extends E
                 chunk = this.buffer.remaining();
             }
             this.buffer.get(b, off, chunk);
+            if (this.buffer.remaining() < this.requestInputSize && !this.endOfStream
&& this.ioctrl != null) {
+                //we have a significant amount of space empty in the buffer, we'll turn on

+                //the input so maybe we'll get another chunk by the time the next read happens
+                //and we can then avoid waiting for input
+                this.ioctrl.requestInput();
+            }
             return chunk;
         } finally {
             this.lock.unlock();



Mime
View raw message