geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject cvs commit: incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol SocketProtocol.java
Date Sat, 24 Apr 2004 06:29:01 GMT
chirino     2004/04/23 23:29:01

  Modified:    modules/network/src/java/org/apache/geronimo/network/protocol/control
                        ControlClientProtocolKitchen.java
                        ControlClientProtocol.java
                        ControlServerProtocol.java
               modules/network/src/java/org/apache/geronimo/network/protocol
                        SocketProtocol.java
  Log:
  Trying to workout some deadlocks.
  
  Revision  Changes    Path
  1.4       +25 -12    incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocolKitchen.java
  
  Index: ControlClientProtocolKitchen.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocolKitchen.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ControlClientProtocolKitchen.java	17 Mar 2004 03:11:59 -0000	1.3
  +++ ControlClientProtocolKitchen.java	24 Apr 2004 06:29:01 -0000	1.4
  @@ -20,8 +20,8 @@
   import java.util.Collection;
   import java.util.Iterator;
   
  -import EDU.oswego.cs.dl.util.concurrent.Mutex;
  -
  +import org.apache.commons.logging.Log;
  +import org.apache.commons.logging.LogFactory;
   import org.apache.geronimo.network.SelectorManager;
   import org.apache.geronimo.network.protocol.AbstractProtocol;
   import org.apache.geronimo.network.protocol.DownPacket;
  @@ -33,24 +33,35 @@
   import org.apache.geronimo.system.ClockPool;
   import org.apache.geronimo.system.ThreadPool;
   
  +import EDU.oswego.cs.dl.util.concurrent.Latch;
  +
   
   /**
    * @version $Revision$ $Date$
    */
   class ControlClientProtocolKitchen extends ProtocolStack implements ControlClientListener
{
   
  +	final private static Log log = LogFactory.getLog(ControlClientProtocolKitchen.class);
  +	
       private ClassLoader classLoader;
       private ThreadPool threadPool;
       private ClockPool clockPool;
       private SelectorManager selectorManager;
  -    private Mutex sendMutex = new Mutex();  //todo: replace with something that uses no
locks
  -
  +    private Latch sendLatch = new Latch();
   
       ControlClientProtocolKitchen() throws InterruptedException {
           push(new Dummy());
  -        sendMutex.acquire();
       }
  -
  +    
  +    /**
  +	 * @see org.apache.geronimo.network.protocol.ProtocolStack#cloneProtocol()
  +	 */
  +	public Protocol cloneProtocol() throws CloneNotSupportedException {
  +		ControlClientProtocolKitchen p = (ControlClientProtocolKitchen) super.cloneProtocol();
  +		p.sendLatch = new Latch();
  +		return p;
  +	}
  +    
       public ClassLoader getClassLoader() {
           return classLoader;
       }
  @@ -84,7 +95,7 @@
       }
   
       public void serveUp(Collection menu) throws ControlException {
  -        System.out.println("serveUp");
  +    	log.trace("serveUp");
   
           ControlContext context = new ControlContext();
           context.setClassLoader(classLoader);
  @@ -105,8 +116,9 @@
           } catch (ProtocolException e) {
               throw new ControlException(e);
           }
  -
  -        sendMutex.release();
  +        
  +    	log.trace("RELEASING send Latch: "+sendLatch);
  +        sendLatch.release();
       }
   
       public void shutdown() {
  @@ -114,9 +126,10 @@
   
       public void sendDown(DownPacket packet) throws ProtocolException {
           try {
  -            if (!sendMutex.attempt(1000 * 1000)) throw new ProtocolException("Send timeout.");
  +        	log.trace("AQUIRING send Latch: "+sendLatch);
  +            if (!sendLatch.attempt(1000 * 1000)) throw new ProtocolException("Send timeout.");
  +        	log.trace("AQUIRED send Latch: "+sendLatch);
               super.sendDown(packet);
  -            sendMutex.release();
           } catch (InterruptedException e) {
               throw new ProtocolException(e);
           }
  
  
  
  1.6       +24 -27    incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocol.java
  
  Index: ControlClientProtocol.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlClientProtocol.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- ControlClientProtocol.java	10 Apr 2004 17:14:01 -0000	1.5
  +++ ControlClientProtocol.java	24 Apr 2004 06:29:01 -0000	1.6
  @@ -17,15 +17,15 @@
   
   package org.apache.geronimo.network.protocol.control;
   
  -import EDU.oswego.cs.dl.util.concurrent.Latch;
  -import EDU.oswego.cs.dl.util.concurrent.Mutex;
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -
   import org.apache.geronimo.network.protocol.DownPacket;
  +import org.apache.geronimo.network.protocol.Protocol;
   import org.apache.geronimo.network.protocol.ProtocolException;
   import org.apache.geronimo.network.protocol.UpPacket;
   
  +import EDU.oswego.cs.dl.util.concurrent.Latch;
  +
   
   /**
    * @version $Revision$ $Date$
  @@ -36,7 +36,7 @@
   
       private ControlClientListener listener;
       private ClassLoader classLoader;
  -    private Mutex sendMutex = new Mutex();  //todo: replace with something that uses no
locks
  +    private Latch sendLatch = new Latch();  //todo: replace with something that uses no
locks
       private Latch shutdownLatch = new Latch();
       private long timeout;
   
  @@ -44,6 +44,16 @@
       private final int STOPPED = 1;
       private int state = STOPPED;
   
  +    /**
  +	 * @see org.apache.geronimo.network.protocol.AbstractProtocol#cloneProtocol()
  +	 */
  +	public Protocol cloneProtocol() throws CloneNotSupportedException {
  +		ControlClientProtocol p = (ControlClientProtocol)super.cloneProtocol();
  +		p.sendLatch = new Latch();
  +		p.shutdownLatch = new Latch();
  +		return p;
  +	}
  +    
       public ControlClientListener getListener() {
           return listener;
       }
  @@ -69,19 +79,9 @@
       }
   
       public void setup() throws ProtocolException {
  -        try {
  -            log.trace("Starting");
  -
  -            getDownProtocol().sendDown(new BootRequestDownPacket()); //todo: this is probably
dangerous, put in thread pool
  -
  -            log.trace("AQUIRING " + sendMutex);
  -            sendMutex.acquire();
  -            log.trace("AQUIRED " + sendMutex);
  -
  -            state = STARTED;
  -        } catch (InterruptedException e) {
  -            throw new ProtocolException(e);
  -        }
  +        log.trace("Starting");
  +        getDownProtocol().sendDown(new BootRequestDownPacket()); //todo: this is probably
dangerous, put in thread pool
  +        state = STARTED;
       }
   
       public void drain() throws ProtocolException {
  @@ -110,9 +110,9 @@
                   log.trace("BOOT RESPONSE");
                   listener.serveUp(((BootResponseUpPacket) p).getMenu());
                   getDownProtocol().sendDown(new BootSuccessDownPacket());
  -                log.trace("RELEASING " + sendMutex);
  -                sendMutex.release();
  -                log.trace("RELEASED " + sendMutex);
  +                log.trace("RELEASING " + sendLatch);
  +                sendLatch.release();
  +                log.trace("RELEASED " + sendLatch);
               } catch (ControlException e) {
                   throw new ProtocolException(e);
               }
  @@ -135,18 +135,15 @@
   
       public void sendDown(DownPacket packet) throws ProtocolException {
           try {
  -            log.trace("AQUIRING " + sendMutex);
  -            if (!sendMutex.attempt(timeout)) throw new ProtocolException("Send timeout");
  -            log.trace("AQUIRED " + sendMutex);
  +            log.trace("AQUIRING " + sendLatch);
  +            if (!sendLatch.attempt(timeout)) throw new ProtocolException("Send timeout");
  +            log.trace("AQUIRED " + sendLatch);
   
               PassthroughDownPacket passthtough = new PassthroughDownPacket();
               passthtough.setBuffers(packet.getBuffers());
   
               getDownProtocol().sendDown(passthtough);
   
  -            log.trace("RELEASING " + sendMutex);
  -            sendMutex.release();
  -            log.trace("RELEASED " + sendMutex);
           } catch (InterruptedException e) {
               throw new ProtocolException(e);
           }
  
  
  
  1.5       +13 -20    incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlServerProtocol.java
  
  Index: ControlServerProtocol.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/control/ControlServerProtocol.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ControlServerProtocol.java	10 Apr 2004 17:14:01 -0000	1.4
  +++ ControlServerProtocol.java	24 Apr 2004 06:29:01 -0000	1.5
  @@ -19,10 +19,8 @@
   
   import java.util.Collection;
   
  -import EDU.oswego.cs.dl.util.concurrent.Mutex;
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -
   import org.apache.geronimo.network.SelectorManager;
   import org.apache.geronimo.network.protocol.DownPacket;
   import org.apache.geronimo.network.protocol.ProtocolException;
  @@ -30,6 +28,8 @@
   import org.apache.geronimo.system.ClockPool;
   import org.apache.geronimo.system.ThreadPool;
   
  +import EDU.oswego.cs.dl.util.concurrent.Latch;
  +
   
   /**
    * @version $Revision$ $Date$
  @@ -43,7 +43,7 @@
       private ThreadPool threadPool;
       private ClockPool clockPool;
       private SelectorManager selectorManager;
  -    private Mutex sendMutex = new Mutex();  //todo: replace with something that uses no
locks
  +    private Latch sendLatch;  //todo: replace with something that uses no locks
       private long timeout;
   
       private final int STARTED = 0;
  @@ -99,13 +99,9 @@
       }
   
       public void setup() throws ProtocolException {
  -        try {
  -            log.trace("Starting");
  -            sendMutex.acquire();
  -            state = STARTED;
  -        } catch (InterruptedException e) {
  -            throw new ProtocolException(e);
  -        }
  +        log.trace("Starting");
  +        sendLatch = new Latch();
  +        state = STARTED;
       }
   
       public void drain() throws ProtocolException {
  @@ -129,9 +125,9 @@
               getDownProtocol().sendDown(constructBootPacket());
           } else if (p instanceof BootSuccessUpPacket) {
               log.trace("BOOT SUCCESS");
  -            log.trace("RELEASING " + sendMutex);
  -            sendMutex.release();
  -            log.trace("RELEASED " + sendMutex);
  +            log.trace("RELEASING " + sendLatch);
  +            sendLatch.release();
  +            log.trace("RELEASED " + sendLatch);
           } else if (p instanceof ShutdownRequestUpPacket) {
               log.trace("SHUTDOWN_REQ");
               getDownProtocol().sendDown(new ShutdownAcknowledgeDownPacket());
  @@ -142,18 +138,15 @@
   
       public void sendDown(DownPacket packet) throws ProtocolException {
           try {
  -            log.trace("AQUIRING " + sendMutex);
  -            if (!sendMutex.attempt(timeout)) throw new ProtocolException("Send timeout.");
  -            log.trace("AQUIRED " + sendMutex);
  +            log.trace("AQUIRING " + sendLatch);
  +            if (!sendLatch.attempt(timeout)) throw new ProtocolException("Send timeout.");
  +            log.trace("AQUIRED " + sendLatch);
   
               PassthroughDownPacket passthtough = new PassthroughDownPacket();
               passthtough.setBuffers(packet.getBuffers());
   
               getDownProtocol().sendDown(passthtough);
   
  -            log.trace("RELEASING " + sendMutex);
  -            sendMutex.release();
  -            log.trace("RELEASED " + sendMutex);
           } catch (InterruptedException e) {
               throw new ProtocolException(e);
           }
  
  
  
  1.7       +28 -16    incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java
  
  Index: SocketProtocol.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SocketProtocol.java	24 Apr 2004 04:07:13 -0000	1.6
  +++ SocketProtocol.java	24 Apr 2004 06:29:01 -0000	1.7
  @@ -65,6 +65,9 @@
       ByteBuffer headerBuffer;
       ByteBuffer bodyBuffer;
       
  +    Object serviceReadMutex;
  +    Object serviceWriteMutex;
  +    
       static int nextConnectionId=0;
       synchronized static int getNextConnectionId() {
       	return nextConnectionId++;    	
  @@ -156,6 +159,8 @@
       	log = LogFactory.getLog(SocketProtocol.class.getName()+":"+getNextConnectionId());
       	sendMutex = new Mutex();
       	headerBuffer = ByteBuffer.allocate(4);
  +    	serviceReadMutex = new Object();
  +    	serviceWriteMutex = new Object();
       	
           if (address == null && acceptedSocketChannel == null) throw new IllegalStateException("No
address set");
   
  @@ -166,6 +171,7 @@
                   socketChannel.configureBlocking(true);
                   if (socketInterface != null) socketChannel.socket().bind(socketInterface);
                   socketChannel.socket().setReuseAddress(true);
  +                socketChannel.socket().setTcpNoDelay(true);
                   socketChannel.connect(address);
               } catch (SocketException e) {
                   state = STOPPED;
  @@ -244,21 +250,26 @@
           }
       }
   
  -    public synchronized void selectionEvent(SelectionKey selection) {
  -        synchronized (this) {
  -            try {
  -                if (selection.isWritable())
  -                    serviceWrite();
  -                if (selection.isReadable())
  -                    serviceRead();
  -            } catch (CancelledKeyException e) {
  -                // who knows, by the time we get here,
  -                // the key could have been canceled.
  +    public void selectionEvent(SelectionKey selection) {
  +        try {
  +            if (selection.isReadable()) {
  +                synchronized (serviceReadMutex) {
  +                	serviceRead();
  +                }
               }
  +            if (selection.isWritable()) {
  +                synchronized (serviceWriteMutex) {
  +                	serviceWrite();
  +                }
  +            } 
  +        } catch (CancelledKeyException e) {
  +        	log.trace("Key Cancelled:", e);
  +            // who knows, by the time we get here,
  +            // the key could have been canceled.
           }
       }
   
  -    synchronized private void serviceWrite() {
  +    private void serviceWrite() {
           log.trace("serviceWrite() triggered.");
           try {
   
  @@ -279,13 +290,14 @@
               // release old buffers
               sendBuffer = null;
   
  +            log.trace("RELEASING " + sendMutex);
  +            sendMutex.release();
  +            log.trace("RELEASED " + sendMutex);
  +
               // We are done writing.
               log.trace("OP_READ " + selectionKey);
               selectorManager.setInterestOps(selectionKey, SelectionKey.OP_READ, 0);
   
  -            log.trace("RELEASING " + sendMutex);
  -            sendMutex.release();
  -            log.trace("RELEASED " + sendMutex);
           } catch (IOException e) {
               log.debug("Communications error, closing connection: ", e);
               close();
  @@ -294,7 +306,7 @@
           }
       }
   
  -    synchronized public void serviceRead() {
  +    public void serviceRead() {
           boolean tracing = log.isTraceEnabled();
           if (tracing) log.trace("serviceRead() triggered.");
           lastUsed = System.currentTimeMillis();
  
  
  

Mime
View raw message