directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: rev 56603 - in incubator/directory/seda/trunk/src/java/org/apache/seda: decoder encoder
Date Thu, 04 Nov 2004 16:26:03 GMT
Author: trustin
Date: Thu Nov  4 08:26:01 2004
New Revision: 56603

Modified:
   incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
Log:
DefaultEncoderManager and DefaultDecoderManager now works without subscribing to ConnectEvent.

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
Thu Nov  4 08:26:01 2004
@@ -27,8 +27,6 @@
 import org.apache.commons.codec.stateful.DecoderCallback;
 import org.apache.commons.codec.stateful.StatefulDecoder;
 import org.apache.seda.event.AbstractSubscriber;
-import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.ConnectSubscriber;
 import org.apache.seda.event.DisconnectEvent;
 import org.apache.seda.event.DisconnectSubscriber;
 import org.apache.seda.event.EventRouter;
@@ -37,7 +35,6 @@
 import org.apache.seda.event.RequestEvent;
 import org.apache.seda.event.filter.EventTypeFilter;
 import org.apache.seda.listener.ClientKey;
-import org.apache.seda.listener.KeyExpiryException;
 import org.apache.seda.listener.UDPClientKey;
 import org.apache.seda.protocol.InetServiceEntry;
 import org.apache.seda.protocol.InetServicesDatabase;
@@ -55,7 +52,7 @@
  * @version $Rev$
  */
 public class DefaultDecoderManager extends DefaultStage
-    implements DecoderManager, InputSubscriber, ConnectSubscriber,
+    implements DecoderManager, InputSubscriber, 
                DisconnectSubscriber
 {
     /** event router or bus this component subscribes and publishes events on */
@@ -91,7 +88,6 @@
         super.setStageMonitor(new LoggingStageMonitor(getClass()));
 
         router.subscribe(new EventTypeFilter(InputEvent.class), this);
-        router.subscribe(new EventTypeFilter(ConnectEvent.class), this);
         router.subscribe(new EventTypeFilter(DisconnectEvent.class), this);
     }
 
@@ -142,93 +138,6 @@
         decoders.remove(event.getClientKey());
     }
 
-    /**
-     * Temporary place holder for functionality that looks up a protocol
-     * specific StatefulDecoder.
-     *
-     * @param key the client key used to determine associated protocol
-     * @return the new stateful nonblocking protocol specific decoder
-     */
-    private StatefulDecoder createDecoder(ClientKey key)
-                                   throws KeyExpiryException
-    {
-        TransportTypeEnum transportType;
-        if (key instanceof UDPClientKey) {
-        	transportType = TransportTypeEnum.UDP;
-        } else {
-        	transportType = TransportTypeEnum.TCP;
-        }
-        
-        Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
-        ProtocolProvider provider = null;
-        while (it.hasNext()) {
-        	InetServiceEntry entry = (InetServiceEntry) it.next();
-        	if (entry.getTransport() == transportType) {
-        		provider = entry.getProtocolProvider();
-        	}
-        }
-
-        // TODO replace RuntimeException with ProtocolProviderNotFoundException
-        if (provider == null)
-        	throw new RuntimeException("No protocol provider available");
-
-        return provider.getDecoderFactory().createDecoder();
-    }
-
-    /**
-     * We basically create a new client decoder and put it into a map for
-     * use later when we are processing input events from the client.
-     *
-     * @see org.apache.seda.event.ConnectSubscriber#inform(
-     * org.apache.seda.event.ConnectEvent)
-     */
-    public void inform(ConnectEvent event)
-    {
-        StatefulDecoder decoder = null;
-        ClientKey key = event.getClientKey();
-
-        try
-        {
-            decoder = new ClientDecoder(key, createDecoder(key));
-        }
-        catch (KeyExpiryException e)
-        {
-            monitor.failedOnInform(this, event, e);
-            return;
-        }
-
-        /*
-         * Here the decoder informs us that a unit of data is decoded.  In the
-         * case of the snickers decoder we're decoding an LDAP message envelope
-         * for a request.  We use this request to create a RequestEvent and
-         * publish the event on the queue.
-         */
-        decoder.setCallback(new DecoderCallback()
-            {
-                public void decodeOccurred(
-                                           StatefulDecoder decoder,
-                                           Object decoded)
-                {
-                    ClientKey key = ((ClientDecoder) decoder).getClientKey();
-                    RequestEvent event = new RequestEvent(this, key, decoded);
-                    router.publish(event);
-                }
-            });
-
-        /*
-         * For potential race conditions between ConnectEvent processing and
-         * the processing of the first InputEvent we synchronize on the decoders
-         * and notify all when we have altered it.  The thread that is waiting
-         * for a client decoder in the decoders map will wait on the map until
-         * awoken.
-         */
-        synchronized (decoders)
-        {
-            decoders.put(key, decoder);
-            decoders.notifyAll();
-        }
-    }
-
     // ------------------------------------------------------------------------
     // Service Interface Methods
     // ------------------------------------------------------------------------
@@ -252,16 +161,7 @@
                           throws DecoderException
     {
         // replace this decoder with a real one later
-        StatefulDecoder decoder = null;
-
-        try
-        {
-            decoder = createDecoder(key);
-        }
-        catch (KeyExpiryException e)
-        {
-            throw new DecoderException("client key has expired");
-        }
+        StatefulDecoder decoder = decoder = createDecoder(key);
 
         // used array to set a value on final variable and get by compiler
         final Object[] decoded = new Object[1];
@@ -318,33 +218,75 @@
      */
     StatefulDecoder getDecoder(ClientKey key)
     {
+        StatefulDecoder decoder = (StatefulDecoder) decoders.get(key);
+        if (decoder == null) {
+        	synchronized (decoders) {
+                decoder = (StatefulDecoder) decoders.get(key);
+        		if (decoder == null) {
+        			decoder = createClientDecoder(key);
+        			decoders.put(key, decoder);
+        		}
+        	}
+        }
+
+        return decoder;
+    }
+
+    /**
+     * Temporary place holder for functionality that looks up a protocol
+     * specific StatefulDecoder.
+     *
+     * @param key the client key used to determine associated protocol
+     * @return the new stateful nonblocking protocol specific decoder
+     */
+    private StatefulDecoder createDecoder(ClientKey key)
+    {
+        TransportTypeEnum transportType;
+        if (key instanceof UDPClientKey) {
+        	transportType = TransportTypeEnum.UDP;
+        } else {
+        	transportType = TransportTypeEnum.TCP;
+        }
+        
+        Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
+        ProtocolProvider provider = null;
+        while (it.hasNext()) {
+        	InetServiceEntry entry = (InetServiceEntry) it.next();
+        	if (entry.getTransport() == transportType) {
+        		provider = entry.getProtocolProvider();
+        	}
+        }
+
+        // TODO replace RuntimeException with ProtocolProviderNotFoundException
+        if (provider == null)
+        	throw new RuntimeException("No protocol provider available");
+
+        return provider.getDecoderFactory().createDecoder();
+    }
+
+    private StatefulDecoder createClientDecoder(ClientKey key)
+    {
         StatefulDecoder decoder = null;
 
+        decoder = new ClientDecoder(key, createDecoder(key));
+
         /*
-         * We synchronize on the decoders map so we can wait for notification
-         * on it if it does not contain the client decoder.  This is in case
-         * the processing of the connect event is slow and the client decoder
-         * has not been created yet.  When processing of the ConnectEvent is
-         * complete we are awoken via a notifyAll in the inform() method.
+         * Here the decoder informs us that a unit of data is decoded.  In the
+         * case of the snickers decoder we're decoding an LDAP message envelope
+         * for a request.  We use this request to create a RequestEvent and
+         * publish the event on the queue.
          */
-        synchronized (decoders)
-        {
-            decoder = (StatefulDecoder) decoders.get(key);
-
-            while (decoder == null)
+        decoder.setCallback(new DecoderCallback()
             {
-                try
-                {
-                    decoders.wait();
-                }
-                catch (InterruptedException e)
+                public void decodeOccurred(
+                                           StatefulDecoder decoder,
+                                           Object decoded)
                 {
-                    e.printStackTrace();
+                    ClientKey key = ((ClientDecoder) decoder).getClientKey();
+                    RequestEvent event = new RequestEvent(this, key, decoded);
+                    router.publish(event);
                 }
-
-                decoder = (StatefulDecoder) decoders.get(key);
-            }
-        }
+            });
 
         return decoder;
     }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
Thu Nov  4 08:26:01 2004
@@ -27,8 +27,6 @@
 import org.apache.commons.codec.stateful.EncoderFactory;
 import org.apache.commons.codec.stateful.StatefulEncoder;
 import org.apache.seda.event.AbstractSubscriber;
-import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.ConnectSubscriber;
 import org.apache.seda.event.DisconnectEvent;
 import org.apache.seda.event.DisconnectSubscriber;
 import org.apache.seda.event.EventRouter;
@@ -37,7 +35,6 @@
 import org.apache.seda.event.ResponseSubscriber;
 import org.apache.seda.event.filter.EventTypeFilter;
 import org.apache.seda.listener.ClientKey;
-import org.apache.seda.listener.KeyExpiryException;
 import org.apache.seda.listener.UDPClientKey;
 import org.apache.seda.protocol.InetServiceEntry;
 import org.apache.seda.protocol.InetServicesDatabase;
@@ -57,7 +54,7 @@
  * @version $Rev$
  */
 public class DefaultEncoderManager extends DefaultStage
-    implements EncoderManager, ConnectSubscriber, ResponseSubscriber,
+    implements EncoderManager, ResponseSubscriber,
                DisconnectSubscriber
 {
     /** the event router used to publish and subscribe to events on */
@@ -88,7 +85,6 @@
         monitor = new EncoderManagerMonitorAdapter();
         this.inetdb = inetdb;
         this.router = router;
-        this.router.subscribe(new EventTypeFilter(ConnectEvent.class), this);
         this.router.subscribe(new EventTypeFilter(ResponseEvent.class), this);
     }
 
@@ -127,6 +123,66 @@
         super.enqueue(event);
     }
 
+    public ByteBuffer encodeBlocking(ClientKey key, Object response)
+                              throws EncoderException
+    {
+        int port = key.getLocalAddress().getPort();
+
+        EncoderFactory factory =
+            (EncoderFactory) factories.get(inetdb.getProtoByPort(port));
+        StatefulEncoder encoder =
+            new ClientEncoder(key, factory.createEncoder());
+
+        // used array to set a value on final variable and get by compiler
+        final Object[] encoded = new Object[1];
+        encoder.setCallback(new EncoderCallback()
+            {
+                public void encodeOccurred(StatefulEncoder encoder, Object obj)
+                {
+                    encoded[0] = obj;
+                }
+            });
+
+        encoder.encode(response);
+
+        // the encoded value should be set
+        if (encoded[0] == null)
+        {
+            throw new EncoderException("Expected a complete encoded object" +
+                                       " but encoder did not produce one");
+        }
+
+        return (ByteBuffer) encoded[0];
+    }
+
+    public void encodeNonBlocking(ClientKey key, Object response)
+                           throws EncoderException
+    {
+        StatefulEncoder encoder = (StatefulEncoder) encoders.get(key);
+        encoder.encode(response);
+    }
+
+    EncoderManagerMonitor getMonitor()
+    {
+        return monitor;
+    }
+
+    StatefulEncoder getEncoder(ClientKey key)
+    {
+    	StatefulEncoder encoder = (StatefulEncoder) encoders.get(key);
+    	if (encoder == null) {
+    		synchronized (encoders) {
+    			encoder = (StatefulEncoder) encoders.get(key);
+    			if (encoder == null) {
+    				encoder = createClientEncoder(key);
+    				encoders.put(key, encoder);
+    			}
+    		}
+    	}
+    	
+        return encoder;
+    }
+
     /**
      * Temporary place holder for functionality that looks up a protocol
      * specific StatefulEncoder.
@@ -135,7 +191,6 @@
      * @return the new stateful nonblocking protocol specific encoder
      */
     private StatefulEncoder createEncoder(ClientKey key)
-                                   throws KeyExpiryException
     {
         TransportTypeEnum transportType;
         if (key instanceof UDPClientKey) {
@@ -167,20 +222,9 @@
      * @see org.apache.seda.event.ConnectSubscriber#inform(
      * org.apache.seda.event.ConnectEvent)
      */
-    public void inform(ConnectEvent event)
+    private StatefulEncoder createClientEncoder(ClientKey key)
     {
-        StatefulEncoder encoder = null;
-        ClientKey key = event.getClientKey();
-
-        try
-        {
-            encoder = new ClientEncoder(key, createEncoder(key));
-        }
-        catch (KeyExpiryException e)
-        {
-            monitor.failedOnInform(this, event, e);
-            return;
-        }
+        StatefulEncoder encoder = new ClientEncoder(key, createEncoder(key));
 
         /*
          * Here the encoder informs us that a response encoded.
@@ -196,55 +240,7 @@
                     router.publish(event);
                 }
             });
-        encoders.put(key, encoder);
-    }
-
-    public ByteBuffer encodeBlocking(ClientKey key, Object response)
-                              throws EncoderException
-    {
-        int port = key.getLocalAddress().getPort();
-
-        EncoderFactory factory =
-            (EncoderFactory) factories.get(inetdb.getProtoByPort(port));
-        StatefulEncoder encoder =
-            new ClientEncoder(key, factory.createEncoder());
-
-        // used array to set a value on final variable and get by compiler
-        final Object[] encoded = new Object[1];
-        encoder.setCallback(new EncoderCallback()
-            {
-                public void encodeOccurred(StatefulEncoder encoder, Object obj)
-                {
-                    encoded[0] = obj;
-                }
-            });
-
-        encoder.encode(response);
-
-        // the encoded value should be set
-        if (encoded[0] == null)
-        {
-            throw new EncoderException("Expected a complete encoded object" +
-                                       " but encoder did not produce one");
-        }
-
-        return (ByteBuffer) encoded[0];
-    }
-
-    public void encodeNonBlocking(ClientKey key, Object response)
-                           throws EncoderException
-    {
-        StatefulEncoder encoder = (StatefulEncoder) encoders.get(key);
-        encoder.encode(response);
-    }
-
-    EncoderManagerMonitor getMonitor()
-    {
-        return monitor;
-    }
-
-    StatefulEncoder getEncoder(ClientKey key)
-    {
-        return (StatefulEncoder) encoders.get(key);
+        
+        return encoder;
     }
 }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
Thu Nov  4 08:26:01 2004
@@ -50,11 +50,7 @@
     {
         ResponseEvent re = (ResponseEvent) event;
         ClientKey key = re.getClientKey();
-        StatefulEncoder encoder;
-        
-        // FIXME Event synchronization issues
-        while ((encoder = encMan.getEncoder(key)) == null)
-        	continue;
+        StatefulEncoder encoder = encMan.getEncoder(key);
 
         try
         {

Mime
View raw message