geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject cvs commit: incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol SocketProtocolStressTest.java
Date Thu, 18 Mar 2004 04:05:27 GMT
adc         2004/03/17 20:05:27

  Modified:    modules/network/src/java/org/apache/geronimo/network/protocol/util
                        PacketUtil.java
               modules/network/src/test/org/apache/geronimo/network/protocol
                        SocketProtocolStressTest.java
  Added:       modules/network/src/java/org/apache/geronimo/network/protocol
                        EchoDownProtocol.java EchoUpProtocol.java
                        PacketInputStream.java PacketOutputStream.java
  Log:
  Added packet input/output streams.
  
  Revision  Changes    Path
  1.1                  incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/EchoDownProtocol.java
  
  Index: EchoDownProtocol.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  package org.apache.geronimo.network.protocol;
  
  import java.util.Collections;
  
  
  /**
   * @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
   */
  public class EchoDownProtocol extends AbstractProtocol {
  
      public void setup() throws ProtocolException {
      }
  
      public void drain() throws ProtocolException {
      }
  
      public void teardown() throws ProtocolException {
      }
  
      public void sendUp(UpPacket packet) throws ProtocolException {
          PlainDownPacket dnPacket = new PlainDownPacket();
  
          dnPacket.setBuffers(Collections.singleton(packet.getBuffer()));
  
          getDownProtocol().sendDown(dnPacket);
      }
  
      public void sendDown(DownPacket packet) throws ProtocolException {
          getDownProtocol().sendDown(packet);
      }
  
  }
  
  
  
  1.1                  incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/EchoUpProtocol.java
  
  Index: EchoUpProtocol.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  package org.apache.geronimo.network.protocol;
  
  import org.apache.geronimo.network.protocol.util.PacketUtil;
  
  
  /**
   * @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
   */
  public class EchoUpProtocol extends AbstractProtocol {
  
      public void setup() throws ProtocolException {
      }
  
      public void drain() throws ProtocolException {
      }
  
      public void teardown() throws ProtocolException {
      }
  
      public void sendUp(UpPacket packet) throws ProtocolException {
          getUpProtocol().sendUp(packet);
      }
  
      public void sendDown(DownPacket packet) throws ProtocolException {
          UpPacket upPacket = new UpPacket();
          upPacket.setBuffer(PacketUtil.consolidate(packet.getBuffers()));
  
          getUpProtocol().sendUp(upPacket);
      }
  
  }
  
  
  
  1.1                  incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketInputStream.java
  
  Index: PacketInputStream.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  package org.apache.geronimo.network.protocol;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.nio.ByteBuffer;
  
  import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
  
  
  /**
   * @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
   */
  public class PacketInputStream extends InputStream {
  
      ProtocolBuffer buffer;
      private final Protocol up;
      private ByteBuffer currentBuffer;
      private boolean closed;
  
  
      public PacketInputStream(Protocol up) {
          this(up, (short) 1);
      }
  
      public PacketInputStream(Protocol up, short queueSize) {
          this.buffer = new ProtocolBuffer(queueSize);
          this.up = up;
          this.currentBuffer = ByteBuffer.allocate(0);
          this.closed = false;
  
          this.up.setUpProtocol(buffer);
          buffer.setDownProtocol(up);
  
      }
  
      public int read() throws IOException {
          if (closed) throw new IOException("Packet InputStream closed");
  
          check();
  
          return currentBuffer.get();
      }
  
      public int read(byte b[]) throws IOException {
          return read(b, 0, b.length);
      }
  
      public int read(byte b[], int off, int len) throws IOException {
          if (b == null) {
              throw new NullPointerException();
          } else if ((off < 0) || (off > b.length) || (len < 0) ||
                  ((off + len) > b.length) || ((off + len) < 0)) {
              throw new IndexOutOfBoundsException();
          } else if (len == 0) {
              return 0;
          }
  
          int length = len;
          while (length > 0) {
              check();
              int remaining = currentBuffer.remaining();
              int segment = Math.min(remaining, length);
              currentBuffer.get(b, off, segment);
              off += segment;
              length -= segment;
          }
          return len;
      }
  
      public long skip(long n) throws IOException {
  
          long length = n;
          while (length > 0) {
              int segment;
              if (length <= Integer.MAX_VALUE) {
                  segment = Math.min(currentBuffer.remaining(), (int) length);
              } else {
                  segment = Math.min(currentBuffer.remaining(), Integer.MAX_VALUE);
              }
              currentBuffer.position(currentBuffer.position() + segment);
              length -= segment;
              check();
          }
  
          return n;
      }
  
      public int available() throws IOException {
          return currentBuffer.remaining();
      }
  
      public void close() throws IOException {
          closed = true;
      }
  
      public synchronized void mark(int readlimit) {
      }
  
      public synchronized void reset() throws IOException {
          throw new IOException("mark/reset not supported");
      }
  
      public boolean markSupported() {
          return false;
      }
  
      private void check() throws IOException {
          if (!currentBuffer.hasRemaining()) {
              try {
                  currentBuffer = buffer.getPacket().getBuffer();
              } catch (InterruptedException e) {
                  throw (IOException) new IOException().initCause(e);
              }
          }
      }
  
      private class ProtocolBuffer implements Protocol {
  
          BoundedLinkedQueue queue;
          Protocol down;
  
          ProtocolBuffer(short size) {
              queue = new BoundedLinkedQueue(size);
          }
  
          UpPacket getPacket() throws InterruptedException {
              return (UpPacket) queue.take();
          }
  
          public Protocol getUpProtocol() {
              throw new NoSuchMethodError("Socket protocol is at the bottom");
          }
  
          public void setUpProtocol(Protocol up) {
              throw new NoSuchMethodError("Socket protocol is at the bottom");
          }
  
          public Protocol getDownProtocol() {
              return down;
          }
  
          public void setDownProtocol(Protocol down) {
              this.down = down;
          }
  
          public void clearLinks() {
              down = null;
          }
  
          public Protocol cloneProtocol() throws CloneNotSupportedException {
              return (Protocol) super.clone();
          }
  
          public void setup() throws ProtocolException {
          }
  
          public void drain() throws ProtocolException {
          }
  
          public void teardown() throws ProtocolException {
          }
  
          public void sendUp(UpPacket packet) throws ProtocolException {
              try {
                  queue.put(packet);
              } catch (InterruptedException e) {
                  throw new ProtocolException(e);
              }
          }
  
          public void sendDown(DownPacket packet) throws ProtocolException {
              throw new UnsupportedOperationException("Method not implemented");
          }
  
      }
  }
  
  
  
  1.1                  incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/PacketOutputStream.java
  
  Index: PacketOutputStream.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  package org.apache.geronimo.network.protocol;
  
  import java.io.IOException;
  import java.io.OutputStream;
  import java.nio.ByteBuffer;
  import java.util.Collections;
  
  
  /**
   * @version $Revision: 1.1 $ $Date: 2004/03/18 04:05:27 $
   */
  public class PacketOutputStream extends OutputStream {
  
      private final Protocol down;
      private ByteBuffer currentBuffer;
      private short packetSize;
      private boolean closed;
  
  
      public PacketOutputStream(Protocol down) {
          this(down, (short) 1024);
      }
  
      public PacketOutputStream(Protocol down, short packetSize) {
          this.down = down;
          this.packetSize = packetSize;
          this.currentBuffer = ByteBuffer.allocate(packetSize);
          this.closed = false;
      }
  
      public short getPacketSize() {
          return packetSize;
      }
  
      public void write(int b) throws IOException {
          if (closed) throw new IOException("PacketOutputStream closed");
  
          currentBuffer.put((byte) b);
          if (!currentBuffer.hasRemaining()) flush();
      }
  
      public void write(byte b[]) throws IOException {
          if (closed) throw new IOException("PacketOutputStream closed");
  
          write(b, 0, b.length);
      }
  
      public void write(byte b[], int off, int len) throws IOException {
          if (closed) throw new IOException("PacketOutputStream closed");
  
          if (b == null) {
              throw new NullPointerException();
          } else if ((off < 0) || (off > b.length) || (len < 0) ||
                  ((off + len) > b.length) || ((off + len) < 0)) {
              throw new IndexOutOfBoundsException();
          } else if (len == 0) {
              return;
          }
          if (currentBuffer.remaining() <= len) {
              while (len > 0) {
                  int remaining = currentBuffer.remaining();
                  int segment = Math.min(remaining, len);
                  currentBuffer.put(b, off, segment);
                  off += segment;
                  len -= remaining;
                  if (!currentBuffer.hasRemaining()) flush();
              }
          } else {
              currentBuffer.put(b, off, len);
          }
      }
  
      public void flush() throws IOException {
          if (closed) throw new IOException("PacketOutputStream closed");
  
          currentBuffer.flip();
  
          if (currentBuffer.remaining() > 0) {
              PlainDownPacket packet = new PlainDownPacket();
              packet.setBuffers(Collections.singleton(currentBuffer));
  
              try {
                  down.sendDown(packet);
              } catch (ProtocolException e) {
                  throw (IOException) new IOException().initCause(e);
              }
          }
  
          currentBuffer = ByteBuffer.allocate(packetSize);
      }
  
      public void close() throws IOException {
          flush();
          closed = true;
      }
  }
  
  
  
  1.3       +13 -1     incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/util/PacketUtil.java
  
  Index: PacketUtil.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/util/PacketUtil.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PacketUtil.java	10 Mar 2004 09:59:15 -0000	1.2
  +++ PacketUtil.java	18 Mar 2004 04:05:27 -0000	1.3
  @@ -44,6 +44,18 @@
           return remaining;
       }
   
  +    public static ByteBuffer consolidate(Collection packets) {
  +        int size = 0;
  +        for (Iterator iter = packets.iterator(); iter.hasNext();) {
  +            size += ((ByteBuffer) iter.next()).remaining();
  +        }
  +        ByteBuffer buffer = ByteBuffer.allocate(size);
  +        for (Iterator iter = packets.iterator(); iter.hasNext();) {
  +            buffer.put((ByteBuffer) iter.next());
  +        }
  +        return (ByteBuffer)buffer.flip();
  +    }
  +
       public final static byte NULL_TYPE = (byte) 0x00;
       public final static byte BOOLEAN_TYPE = (byte) 0x01;
       public final static byte CHARACTER_TYPE = (byte) 0x02;
  
  
  
  1.5       +3 -4      incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/SocketProtocolStressTest.java
  
  Index: SocketProtocolStressTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/test/org/apache/geronimo/network/protocol/SocketProtocolStressTest.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SocketProtocolStressTest.java	17 Mar 2004 03:12:00 -0000	1.4
  +++ SocketProtocolStressTest.java	18 Mar 2004 04:05:27 -0000	1.5
  @@ -20,11 +20,10 @@
   import java.net.InetSocketAddress;
   import java.net.URI;
   import java.nio.ByteBuffer;
  -import java.nio.ByteOrder;
   import java.util.ArrayList;
   
  -import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
   import EDU.oswego.cs.dl.util.concurrent.CountDown;
  +import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
   import junit.framework.TestCase;
   
   import org.apache.geronimo.network.SelectorManager;
  @@ -91,7 +90,7 @@
           finished.acquire();
   
           Thread.sleep(5 * 1000);
  -        
  +
           assertEquals(WORKERS * MESSAGE_COUNT, count);
       }
   
  
  
  

Mime
View raw message