geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdam...@apache.org
Subject cvs commit: incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol PacketStreamTest.java
Date Mon, 19 Apr 2004 16:29:31 GMT
gdamour     2004/04/19 09:29:31

  Modified:    modules/network/src/java/org/apache/geronimo/network/protocol
                        PacketInputStream.java
               modules/network/src/test/org/apache/geronimo/network/protocol
                        PacketStreamTest.java
  Log:
  o Provide a call-back mechanism when a UpPacket is pushed from
  the adapted Protocol; This way, a client does not have to poll the
  InputStream.
  o Improve available and read in order to take into account the
  internal buffer of UpPackets.
  
  Revision  Changes    Path
  1.3       +53 -7     incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketInputStream.java
  
  Index: PacketInputStream.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketInputStream.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PacketInputStream.java	20 Mar 2004 20:39:11 -0000	1.2
  +++ PacketInputStream.java	19 Apr 2004 16:29:31 -0000	1.3
  @@ -28,8 +28,17 @@
    */
   public class PacketInputStream extends InputStream {
   
  -    ProtocolBuffer buffer;
  +    /**
  +     * Null AvailableCallBack.
  +     */
  +    private static final AvailableCallBack NULL_CALLBACK =
  +        new AvailableCallBack() {
  +            public void execute() {}
  +        };
  +    
  +    private final ProtocolBuffer buffer;
       private final Protocol up;
  +    private final AvailableCallBack callBack;
       private ByteBuffer currentBuffer;
       private boolean closed;
   
  @@ -39,14 +48,31 @@
       }
   
       public PacketInputStream(Protocol up, short queueSize) {
  +        this(up, queueSize, null);
  +    }
  +    
  +    /**
  +     * Creates an InputStream on top of the provided protocol.
  +     * 
  +     * @param up Protocol.
  +     * @param queueSize Size of the queue used to buffer UpPackets coming from
  +     * up.
  +     * @param aCallBack Callback when an UpPacket is received from up.
  +     */
  +    public PacketInputStream(Protocol up, short queueSize,
  +        AvailableCallBack aCallBack) {
           this.buffer = new ProtocolBuffer(queueSize);
  +        if ( null == aCallBack ) {
  +            this.callBack = NULL_CALLBACK;
  +        } else {
  +            this.callBack = aCallBack;
  +        }
           this.up = up;
           this.currentBuffer = ByteBuffer.allocate(0);
           this.closed = false;
   
           this.up.setUpProtocol(buffer);
           buffer.setDownProtocol(this.up);
  -
       }
   
       public int read() throws IOException {
  @@ -70,7 +96,7 @@
           }
   
           int length = len;
  -        while (length > 0) {
  +        while (length > 0 && 0 < available() ) {
               check();
               int remaining = currentBuffer.remaining();
               int segment = Math.min(remaining, length);
  @@ -78,7 +104,7 @@
               off += segment;
               length -= segment;
           }
  -        return len;
  +        return len - length;
       }
   
       public long skip(long n) throws IOException {
  @@ -100,7 +126,7 @@
       }
   
       public int available() throws IOException {
  -        return currentBuffer.remaining();
  +        return currentBuffer.remaining() + buffer.available();
       }
   
       public void close() throws IOException {
  @@ -132,13 +158,20 @@
   
           BoundedLinkedQueue queue;
           Protocol down;
  +        volatile int available;
   
           ProtocolBuffer(short size) {
               queue = new BoundedLinkedQueue(size);
           }
   
  +        int available() {
  +            return available;
  +        }
  +        
           UpPacket getPacket() throws InterruptedException {
  -            return (UpPacket) queue.take();
  +            UpPacket packet = (UpPacket) queue.take();
  +            available -= packet.getBuffer().remaining();
  +            return packet;
           }
   
           public Protocol getUpProtocol() {
  @@ -176,10 +209,12 @@
   
           public void sendUp(UpPacket packet) throws ProtocolException {
               try {
  +                available += packet.getBuffer().remaining();
                   queue.put(packet);
               } catch (InterruptedException e) {
                   throw new ProtocolException(e);
               }
  +            callBack.execute();
           }
   
           public void sendDown(DownPacket packet) throws ProtocolException {
  @@ -187,4 +222,15 @@
           }
   
       }
  +
  +    /**
  +     * When an UpPacket has been received by the protocol from which this
  +     * instance is reading, the execute method is called.
  +     * <BR>
  +     * It allows reading from the InputStream without having to poll it.
  +     */
  +    public interface AvailableCallBack {
  +        public void execute() throws ProtocolException;
  +    }
  +    
   }
  
  
  
  1.4       +44 -2     incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/PacketStreamTest.java
  
  Index: PacketStreamTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/PacketStreamTest.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- PacketStreamTest.java	21 Mar 2004 14:27:09 -0000	1.3
  +++ PacketStreamTest.java	19 Apr 2004 16:29:31 -0000	1.4
  @@ -16,10 +16,15 @@
    */
   package org.apache.geronimo.network.protocol;
   
  +import java.io.ByteArrayInputStream;
  +import java.io.ByteArrayOutputStream;
   import java.io.IOException;
  +import java.io.InputStream;
   import java.io.ObjectInputStream;
   import java.io.ObjectOutputStream;
   
  +import org.apache.geronimo.network.protocol.PacketInputStream.AvailableCallBack;
  +
   import EDU.oswego.cs.dl.util.concurrent.Latch;
   import junit.framework.TestCase;
   
  @@ -63,6 +68,25 @@
           assertFalse("Writer thread failed", failed);
       }
   
  +    public void testCallBack() throws Exception {
  +        Thread thread = new Thread(new WriterThread((short) 2), "Test Writer");
  +
  +        startLatch.release();
  +        
  +        DummyCallBack callBack = new DummyCallBack();
  +        PacketInputStream in = new PacketInputStream(eup, (short) 50, callBack);
  +        callBack.setInputStream(in);
  +        thread.start();
  +        thread.join();
  +
  +        InputStream memIn = new ByteArrayInputStream(callBack.memOut.toByteArray());
  +        ObjectInputStream objIn = new ObjectInputStream(memIn);
  +        String msg = (String) objIn.readObject();
  +
  +        assertEquals(msg, "Hello World!");
  +        assertFalse("Writer thread failed", failed);
  +    }
  +
       class WriterThread implements Runnable {
   
           short packetSize;
  @@ -87,10 +111,28 @@
           }
       }
   
  +    private class DummyCallBack implements AvailableCallBack {
  +        private InputStream in;
  +        private ByteArrayOutputStream memOut = new ByteArrayOutputStream();
  +        private void setInputStream(InputStream anIn) {
  +            in = anIn;
  +        }
  +        public void execute() {
  +            try {
  +                int size = in.available();
  +                byte[] buffer = new byte[size];
  +                in.read(buffer);
  +                memOut.write(buffer);
  +            } catch (IOException e) {
  +                ;
  +            }
  +        }
  +    }
  +
       public void setUp() throws Exception {
           eup = new EchoUpProtocol();
           startLatch = new Latch();
           failed = false;
       }
  -
  +    
   }
  
  
  

Mime
View raw message