directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: rev 57525 - in incubator/directory/seda/trunk/src/java/org/apache/seda: encoder event output
Date Fri, 12 Nov 2004 14:36:59 GMT
Author: trustin
Date: Fri Nov 12 06:36:57 2004
New Revision: 57525

Added:
   incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
  (contents, props changed)
Modified:
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
Log:
Added: GatheringOutputEvent which contains multiple ByteBuffers.
Added: OutputManager.write(ClientKey, ByteBuffer[]) and its implementors

Modified: TCPOutputManager and UDPOutputManager listen to and process GatheringOutputEvent
Modified: Now encoders can pass both ByteBuffer and ByteBuffer[] as a result.  DefaultEncoderManager
generates OutputEvent for ByteBuffer and GatheringOutputEvent for ByteBuffer[].

Please note that this code is not tested although I believe it should work very fine. :)

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
Fri Nov 12 06:36:57 2004
@@ -14,10 +14,10 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.seda.encoder;
 
 import java.nio.ByteBuffer;
+
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,6 +30,7 @@
 import org.apache.seda.event.DisconnectEvent;
 import org.apache.seda.event.DisconnectSubscriber;
 import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
 import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.ResponseEvent;
 import org.apache.seda.event.ResponseSubscriber;
@@ -54,8 +55,7 @@
  * @version $Rev$
  */
 public class DefaultEncoderManager extends DefaultStage
-    implements EncoderManager, ResponseSubscriber,
-               DisconnectSubscriber
+    implements EncoderManager, ResponseSubscriber, DisconnectSubscriber
 {
     /** the event router used to publish and subscribe to events on */
     private final EventRouter router;
@@ -169,22 +169,31 @@
 
     StatefulEncoder getEncoder(ClientKey key)
     {
-    	StatefulEncoder encoder;
-    	if (key.isStateful()) {
-	    	encoder = (StatefulEncoder) encoders.get(key);
-	    	if (encoder == null) {
-	    		synchronized (encoders) {
-	    			encoder = (StatefulEncoder) encoders.get(key);
-	    			if (encoder == null) {
-	    				encoder = createClientEncoder(key);
-	    				encoders.put(key, encoder);
-	    			}
-	    		}
-	    	}
-    	} else {
-			encoder = createClientEncoder(key);
-    	}
-    	
+        StatefulEncoder encoder;
+
+        if (key.isStateful())
+        {
+            encoder = (StatefulEncoder) encoders.get(key);
+
+            if (encoder == null)
+            {
+                synchronized (encoders)
+                {
+                    encoder = (StatefulEncoder) encoders.get(key);
+
+                    if (encoder == null)
+                    {
+                        encoder = createClientEncoder(key);
+                        encoders.put(key, encoder);
+                    }
+                }
+            }
+        }
+        else
+        {
+            encoder = createClientEncoder(key);
+        }
+
         return encoder;
     }
 
@@ -198,25 +207,33 @@
     private StatefulEncoder createEncoder(ClientKey key)
     {
         TransportTypeEnum transportType;
-        if (key instanceof UDPClientKey) {
-        	transportType = TransportTypeEnum.UDP;
-        } else {
-        	transportType = TransportTypeEnum.TCP;
+
+        if (key instanceof UDPClientKey)
+        {
+            transportType = TransportTypeEnum.UDP;
+        }
+        else
+        {
+            transportType = TransportTypeEnum.TCP;
         }
-        
+
         Iterator it = inetdb.getByPort(key.getLocalAddress().getPort());
         ProtocolProvider provider = null;
-        while (it.hasNext()) {
-        	InetServiceEntry entry = (InetServiceEntry) it.next();
-        	if (entry.getTransport() == transportType) {
-        		provider = entry.getProtocolProvider();
-        		break;
-        	}
+
+        while (it.hasNext())
+        {
+            InetServiceEntry entry = (InetServiceEntry) it.next();
+
+            if (entry.getTransport() == transportType)
+            {
+                provider = entry.getProtocolProvider();
+                break;
+            }
         }
 
         // TODO replace RuntimeException with ProtocolProviderNotFoundException
         if (provider == null)
-        	throw new RuntimeException("No protocol provider available");
+            throw new RuntimeException("No protocol provider available");
 
         return provider.getEncoderFactory().createEncoder();
     }
@@ -241,12 +258,31 @@
                                            Object encoded)
                 {
                     ClientKey key = ((ClientEncoder) encoder).getClientKey();
-                    OutputEvent event =
-                            new OutputEvent(this, key, (ByteBuffer) encoded);
+                    EventObject event;
+
+                    if (encoded instanceof ByteBuffer)
+                    {
+                        event =
+                                                        new OutputEvent(this,
+                                                                        key,
+                                                                        (ByteBuffer) encoded);
+                    }
+                    else if (encoded instanceof ByteBuffer[])
+                    {
+                        event =
+                                                        new GatheringOutputEvent(this,
+                                                                                 key,
+                                                                                 (ByteBuffer[])
encoded);
+                    }
+                    else
+                    {
+                        throw new IllegalArgumentException("Encoded data must be ByteBuffer
or ByteBuffer[].");
+                    }
+
                     router.publish(event);
                 }
             });
-        
+
         return encoder;
     }
 }

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/event/GatheringOutputEvent.java
Fri Nov 12 06:36:57 2004
@@ -0,0 +1,60 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+
+package org.apache.seda.event;
+
+import java.nio.ByteBuffer;
+
+import org.apache.seda.listener.ClientKey;
+
+
+/**
+ * An event used to denote output to send to a client.  The output event
+ * only connotates that data is available for output but not yet delivered.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class GatheringOutputEvent extends ClientEvent
+{
+    /** the data chunks to send */
+    private final ByteBuffer[] buffers;
+
+    /**
+     * Creates an output event using a clientKey, buf of data and the source
+     * for this event.
+     *
+     * @param source the source which created this event
+     * @param clientKey the key of the client
+     * @param buf the buffer containing the chunk to output
+     */
+    public GatheringOutputEvent(Object source, ClientKey clientKey, ByteBuffer[] buffers)
+    {
+        super(source, clientKey);
+        this.buffers = buffers;
+    }
+
+    /**
+     * Gets the array of buffers containing the chunks to output.
+     *
+     * @return the array of chunks to flush back to the client
+     */
+    public ByteBuffer[] getBuffers()
+    {
+        return buffers;
+    }
+}

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/OutputManager.java	Fri
Nov 12 06:36:57 2004
@@ -40,4 +40,6 @@
      * @throws IOException if there is a failure while sending the data
      */
     void write(ClientKey key, ByteBuffer buf) throws IOException;
+    
+    void write(ClientKey key, ByteBuffer[] buffers) throws IOException;
 }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java	Fri
Nov 12 06:36:57 2004
@@ -14,12 +14,13 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.seda.output;
 
 import java.io.IOException;
+
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,6 +32,7 @@
 import org.apache.seda.event.DisconnectSubscriber;
 import org.apache.seda.event.EventFilter;
 import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
 import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.OutputSubscriber;
 import org.apache.seda.event.filter.ClientKeyTypeFilter;
@@ -79,24 +81,18 @@
     {
         super(config);
         this.router = router;
-        this.router.subscribe(
-        		new CompositeEventFilter(
-        				new EventFilter[] {
-        						new EventTypeFilter(ConnectEvent.class),
-								new ClientKeyTypeFilter(TCPClientKey.class)
-        				}, CompositeEventFilter.AND), this);
-        this.router.subscribe(
-        		new CompositeEventFilter(
-        				new EventFilter[] {
-        						new EventTypeFilter(OutputEvent.class),
-								new ClientKeyTypeFilter(TCPClientKey.class)
-        				}, CompositeEventFilter.AND), this);
-        this.router.subscribe(
-        		new CompositeEventFilter(
-        				new EventFilter[] {
-        						new EventTypeFilter(DisconnectEvent.class),
-								new ClientKeyTypeFilter(TCPClientKey.class)
-        				}, CompositeEventFilter.AND), this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new EventTypeFilter(ConnectEvent.class),
new ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new EventTypeFilter(OutputEvent.class),
new ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new EventTypeFilter(GatheringOutputEvent.class),
new ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new EventTypeFilter(DisconnectEvent.class),
new ClientKeyTypeFilter(TCPClientKey.class) },
+                                                       CompositeEventFilter.AND),
+                              this);
         config.setHandler(new OutputStageHandler());
         this.setStageMonitor(new LoggingStageMonitor());
         this.setOutputMonitor(new LoggingOutputMonitor());
@@ -130,6 +126,11 @@
         enqueue(event);
     }
 
+    public void inform(GatheringOutputEvent event)
+    {
+        enqueue(event);
+    }
+
     /* (non-Javadoc)
      * @see org.apache.seda.event.ConnectSubscriber#inform(
      * org.apache.seda.event.ConnectEvent)
@@ -184,6 +185,21 @@
         monitor.writeOccurred(this, key);
     }
 
+    public void write(ClientKey key, ByteBuffer[] buffers)
+               throws IOException
+    {
+        SocketChannel channel = (SocketChannel) channels.get(key);
+
+        if (null == channel)
+        {
+            monitor.channelMissing(this, key);
+            return;
+        }
+
+        channel.write(buffers);
+        monitor.writeOccurred(this, key);
+    }
+
     /**
      * Sets the output manager's monitor.
      *
@@ -208,6 +224,20 @@
                 try
                 {
                     write(event.getClientKey(), event.getBuffer());
+                }
+                catch (IOException e)
+                {
+                    monitor.failedOnWrite(TCPOutputManager.this,
+                                          event.getClientKey(), e);
+                }
+            }
+            else if (generic instanceof GatheringOutputEvent)
+            {
+                GatheringOutputEvent event = (GatheringOutputEvent) generic;
+
+                try
+                {
+                    write(event.getClientKey(), event.getBuffers());
                 }
                 catch (IOException e)
                 {

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java	Fri
Nov 12 06:36:57 2004
@@ -14,16 +14,18 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.seda.output;
 
 import java.io.IOException;
+
 import java.nio.ByteBuffer;
+
 import java.util.EventObject;
 
 import org.apache.seda.event.AbstractSubscriber;
 import org.apache.seda.event.EventFilter;
 import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.GatheringOutputEvent;
 import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.OutputSubscriber;
 import org.apache.seda.event.filter.ClientKeyTypeFilter;
@@ -66,12 +68,12 @@
     {
         super(config);
         this.router = router;
-        this.router.subscribe(
-        		new CompositeEventFilter(
-        				new EventFilter[] {
-        						new EventTypeFilter(OutputEvent.class),
-								new ClientKeyTypeFilter(UDPClientKey.class)
-        				}, CompositeEventFilter.AND), this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new EventTypeFilter(OutputEvent.class),
new ClientKeyTypeFilter(UDPClientKey.class) },
+                                                       CompositeEventFilter.AND),
+                              this);
+        this.router.subscribe(new CompositeEventFilter(new EventFilter[] { new EventTypeFilter(GatheringOutputEvent.class),
new ClientKeyTypeFilter(UDPClientKey.class) },
+                                                       CompositeEventFilter.AND),
+                              this);
         config.setHandler(new OutputStageHandler());
         this.setStageMonitor(new LoggingStageMonitor());
         this.setOutputMonitor(new LoggingOutputMonitor());
@@ -105,6 +107,11 @@
         enqueue(event);
     }
 
+    public void inform(GatheringOutputEvent event)
+    {
+        enqueue(event);
+    }
+
     // ------------------------------------------------------------------------
     // OutputManager method
     // ------------------------------------------------------------------------
@@ -117,11 +124,33 @@
     public void write(ClientKey key, ByteBuffer buf)
                throws IOException
     {
-    	UDPClientKey udpKey = (UDPClientKey) key;
+        UDPClientKey udpKey = (UDPClientKey) key;
         udpKey.getSocket().getChannel().send(buf, key.getRemoteAddress());
         monitor.writeOccurred(this, key);
     }
 
+    public void write(ClientKey key, ByteBuffer[] buffers)
+               throws IOException
+    {
+        UDPClientKey udpKey = (UDPClientKey) key;
+        int size = 0;
+
+        for (int i = buffers.length - 1; i >= 0; i--)
+        {
+            size += buffers[i].remaining();
+        }
+
+        ByteBuffer mergedBuf = ByteBuffer.allocate(size);
+
+        for (int i = 0; i < buffers.length; i++)
+        {
+            mergedBuf.put(buffers[i]);
+        }
+
+        udpKey.getSocket().getChannel().send(mergedBuf, key.getRemoteAddress());
+        monitor.writeOccurred(this, key);
+    }
+
     /**
      * Sets the output manager's monitor.
      *
@@ -149,8 +178,21 @@
                 }
                 catch (IOException e)
                 {
-                    monitor.failedOnWrite(
-                                          UDPOutputManager.this,
+                    monitor.failedOnWrite(UDPOutputManager.this,
+                                          event.getClientKey(), e);
+                }
+            }
+            else if (generic instanceof GatheringOutputEvent)
+            {
+                GatheringOutputEvent event = (GatheringOutputEvent) generic;
+
+                try
+                {
+                    write(event.getClientKey(), event.getBuffers());
+                }
+                catch (IOException e)
+                {
+                    monitor.failedOnWrite(UDPOutputManager.this,
                                           event.getClientKey(), e);
                 }
             }

Mime
View raw message