Author: akarasulu Date: Mon Sep 6 23:05:25 2004 New Revision: 43451 Added: incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/ClientEncoder.java (contents, props changed) incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/AddProtocolEvent.java (contents, props changed) incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/ProtocolEvent.java (contents, props changed) incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/ProtocolSubscriber.java (contents, props changed) Modified: incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManager.java incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManagerMonitor.java incubator/directory/seda/trunk/api/src/java/org/apache/seda/protocol/InetServicesDatabase.java incubator/directory/seda/trunk/impl/src/java/org/apache/seda/decoder/DefaultDecoderManager.java incubator/directory/seda/trunk/impl/src/java/org/apache/seda/encoder/DefaultEncoderManager.java incubator/directory/seda/trunk/impl/src/test/org/apache/seda/decoder/DefaultDecoderManagerTest.java Log: Commit changes ... o added new class of events for protocol configuration: ProtocolEvents o added subscriber for ProtocolEvents o added subclass of ProtocolEvent for new protocol additions o added StatefulEncoder wrapper for client encoder tracking o modified services database to have simple port to protocol lookup o added ProtocolSubscriber interface to both EncMan and DecMan o added ConnectSubscriber and DisconnectSubscriber to use stateful non-blocking encode functionality Added: incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/ClientEncoder.java ============================================================================== --- (empty file) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/ClientEncoder.java Mon Sep 6 23:05:25 2004 @@ -0,0 +1,92 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.seda.encoder; + + +import org.apache.commons.codec.EncoderException; +import org.apache.commons.codec.stateful.EncoderMonitor; +import org.apache.commons.codec.stateful.StatefulEncoder; +import org.apache.commons.codec.stateful.EncoderCallback; + +import org.apache.seda.listener.ClientKey; + + +/** + * A stateful encoder dedicated to a specific client. + * + * @author + * Apache Directory Project + * @version $Rev: 43377 $ + */ +public class ClientEncoder implements StatefulEncoder, EncoderCallback +{ + /** the key of the client this encoder is associated with */ + private final ClientKey key; + /** the actual encoder doing the work for us */ + private final StatefulEncoder encoder; + /** the callback used by this encoder */ + private EncoderCallback cb; + + + /** + * Creates a client dedicated stateful encoder. + * + * @param key the key of the client this encoder is for + * @param encoder the underlying encoder doing the work + */ + public ClientEncoder( ClientKey key, StatefulEncoder encoder ) + { + this.key = key; + this.encoder = encoder; + this.encoder.setCallback( this ); + } + + + public void encode( Object encoded ) throws EncoderException + { + encoder.encode( encoded ); + } + + + public void setCallback( EncoderCallback cb ) + { + this.cb = cb; + } + + + public void setEncoderMonitor( EncoderMonitor monitor ) + { + encoder.setEncoderMonitor( monitor ); + } + + + /** + * Gets the key of the client this stateful encoder is dedicated to. + * + * @return the key of the client for this stateful encoder + */ + public ClientKey getClientKey() + { + return key; + } + + + public void encodeOccurred( StatefulEncoder encoder, Object encoded ) + { + cb.encodeOccurred( this, encoded ); + } +} Modified: incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManager.java ============================================================================== --- incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManager.java (original) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManager.java Mon Sep 6 23:05:25 2004 @@ -34,13 +34,22 @@ */ public interface EncoderManager { + /** + * Encodes a protocol response without blocking. + * + * @param key the client's key + * @param response the protocol response message to be encoded. + */ + public void encodeNonBlocking( ClientKey key, Object response ) + throws EncoderException; + /** - * Synchronously encodes a protocol Response message into a byte buffer. + * Encodes a protocol response into a byte buffer while blocking. * * @param key the client's key * @param response the protocol response message to be encoded. */ - public ByteBuffer encode( ClientKey key, Object response ) - throws EncoderException; + public ByteBuffer encodeBlocking( ClientKey key, Object response ) + throws EncoderException; } Modified: incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManagerMonitor.java ============================================================================== --- incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManagerMonitor.java (original) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/encoder/EncoderManagerMonitor.java Mon Sep 6 23:05:25 2004 @@ -16,7 +16,8 @@ */ package org.apache.seda.encoder ; -import org.apache.seda.encoder.EncoderManager; + +import org.apache.seda.listener.ClientKey; import java.util.EventObject; @@ -33,5 +34,7 @@ void failedOnInform( EncoderManager manager, EventObject event, Throwable t ) ; void failedOnEncode( EncoderManager manager, EventObject event, + Throwable t ) ; + void failedOnEncode( EncoderManager manager, ClientKey key, Object response, Throwable t ) ; } Added: incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/AddProtocolEvent.java ============================================================================== --- (empty file) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/AddProtocolEvent.java Mon Sep 6 23:05:25 2004 @@ -0,0 +1,41 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.seda.event; + + +import org.apache.seda.protocol.ProtocolProvider; + + +/** + * An event indicating the addition of a new protocol. + * + * @author Apache Directory Project + * @version $Rev$ + */ +public class AddProtocolEvent extends ProtocolEvent +{ + /** + * Creates a protocol addition event denoting the addition of a new + * protocol. + * + * @param proto the newly added protocol + */ + public AddProtocolEvent( ProtocolProvider proto ) + { + super( proto ); + } +} Added: incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/ProtocolEvent.java ============================================================================== --- (empty file) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/ProtocolEvent.java Mon Sep 6 23:05:25 2004 @@ -0,0 +1,53 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.seda.event; + + +import org.apache.seda.protocol.ProtocolProvider; + +import java.util.EventObject; + + +/** + * A base for protocol configuration specific events. + * + * @author Apache Directory Project + * @version $Rev$ + */ +public class ProtocolEvent extends EventObject +{ + /** + * Creates a protocol event with the protocol provider as the source. + * + * @param proto the protocol provider associated with this event + */ + public ProtocolEvent( ProtocolProvider proto ) + { + super( proto ); + } + + + /** + * Gets the protocol provider associated with this protocol event. + * + * @return the protocol provider that caused this event + */ + public ProtocolProvider getProtocolProvider() + { + return ( ProtocolProvider ) getSource(); + } +} Added: incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/ProtocolSubscriber.java ============================================================================== --- (empty file) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/event/ProtocolSubscriber.java Mon Sep 6 23:05:25 2004 @@ -0,0 +1,34 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.seda.event; + +/** + * Type specific subscriber for all protocol events which represent protocol + * configuration changes. + * + * @author Apache Directory Project + * @version $Rev$ + */ +public interface ProtocolSubscriber extends Subscriber +{ + /** + * Informs this subscriber of a protocol event. + * + * @param event the protocol event to inform of + */ + void inform( ProtocolEvent event ); +} Modified: incubator/directory/seda/trunk/api/src/java/org/apache/seda/protocol/InetServicesDatabase.java ============================================================================== --- incubator/directory/seda/trunk/api/src/java/org/apache/seda/protocol/InetServicesDatabase.java (original) +++ incubator/directory/seda/trunk/api/src/java/org/apache/seda/protocol/InetServicesDatabase.java Mon Sep 6 23:05:25 2004 @@ -51,6 +51,13 @@ Iterator getByPort( String port ); /** + * Gets the protocol associated with a port. + * + * @param port the port one which the service resides + */ + String getProtoByPort( int port ); + + /** * Gets the service entry by name using the authoritative service name and * a transport protocol. * Modified: incubator/directory/seda/trunk/impl/src/java/org/apache/seda/decoder/DefaultDecoderManager.java ============================================================================== --- incubator/directory/seda/trunk/impl/src/java/org/apache/seda/decoder/DefaultDecoderManager.java (original) +++ incubator/directory/seda/trunk/impl/src/java/org/apache/seda/decoder/DefaultDecoderManager.java Mon Sep 6 23:05:25 2004 @@ -27,22 +27,15 @@ import org.apache.seda.stage.DefaultStage; import org.apache.seda.stage.LoggingStageMonitor; -import org.apache.seda.event.InputEvent; -import org.apache.seda.event.EventRouter; import org.apache.seda.listener.ClientKey; -import org.apache.seda.event.ConnectEvent; -import org.apache.seda.event.RequestEvent; -import org.apache.seda.event.DisconnectEvent; -import org.apache.seda.event.InputSubscriber; -import org.apache.seda.event.ConnectSubscriber; -import org.apache.seda.event.AbstractSubscriber; -import org.apache.seda.event.DisconnectSubscriber; +import org.apache.seda.listener.KeyExpiryException; +import org.apache.seda.event.*; +import org.apache.seda.protocol.InetServicesDatabase; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.stateful.DecoderCallback; import org.apache.commons.codec.stateful.StatefulDecoder; - -import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.codec.stateful.DecoderFactory; /** @@ -55,13 +48,18 @@ implements DecoderManager, InputSubscriber, - ConnectSubscriber, + ConnectSubscriber, + ProtocolSubscriber, DisconnectSubscriber { /** event router or bus this component subscribes and publishes events on */ private final EventRouter router; + /** the internet service database */ + private final InetServicesDatabase inetdb; /** map of decoders for client keys */ private final Map decoders = new HashMap(); + /** map of DecoderFactory for different protos using proto name as key */ + private final Map factories = new HashMap(2); /** the monitor used for this decoder manager */ private DecoderManagerMonitor monitor; @@ -72,16 +70,19 @@ * @param router the event bus or router component depended upon * @param config the stage configuration */ - public DefaultDecoderManager( EventRouter router, StageConfig config ) + public DefaultDecoderManager( EventRouter router, StageConfig config, + InetServicesDatabase inetdb ) { super( config ); this.router = router; + this.inetdb = inetdb; this.monitor = new DecoderManagerMonitorAdapter(); super.setMonitor( new LoggingStageMonitor( getClass() ) ); router.subscribe( InputEvent.class, this ); router.subscribe( ConnectEvent.class, this ); + router.subscribe( ProtocolEvent.class, this ); router.subscribe( DisconnectEvent.class, this ); } @@ -110,7 +111,25 @@ } } - + + /** + * For now this method just adds the DecoderFactory associated with the + * protocol to a map for use when setting up new clients. Later as other + * events besides AddProtocolEvent are supported we'll add more + * functionality. + * + * @param event the protocol event to be informed of + */ + public void inform( ProtocolEvent event ) + { + if ( event instanceof AddProtocolEvent ) + { + factories.put( event.getProtocolProvider().getName(), + event.getProtocolProvider().getDecoderFactory() ); + } + } + + /** * Enqueues the event onto this Stages event queue for processing. * @@ -144,14 +163,12 @@ * @param key the client key used to determine associated protocol * @return the new stateful nonblocking protocol specific decoder */ - private StatefulDecoder createClientDecoder( ClientKey key ) + private StatefulDecoder createDecoder( ClientKey key ) + throws KeyExpiryException { - if ( key == null ) - { - throw new NullPointerException( - "non null client key required to create decoder" ); - } - throw new NotImplementedException( "create new decoder for client" ); + String proto = inetdb.getProtoByPort( key.getSocket().getLocalPort() ); + DecoderFactory factory = ( DecoderFactory ) factories.get( proto ); + return factory.createDecoder(); } @@ -164,9 +181,18 @@ */ public void inform( ConnectEvent event ) { - StatefulDecoder decoder; + StatefulDecoder decoder = null; ClientKey key = event.getClientKey(); - decoder = new ClientDecoder( key, createClientDecoder( key ) ); + + try + { + decoder = new ClientDecoder( key, createDecoder( key ) ); + } + catch ( KeyExpiryException e ) + { + monitor.failedOnInform( this, event, e ); + return; + } /* * Here the decoder informs us that a unit of data is decoded. In the @@ -213,7 +239,16 @@ throws DecoderException { // replace this decoder with a real one later - StatefulDecoder decoder = createClientDecoder( key ); + StatefulDecoder decoder = null; + try + { + decoder = createDecoder( key ); + } + catch ( KeyExpiryException e ) + { + throw new DecoderException( "client key has expired" ); + } + // used array to set a value on final variable and get by compiler final Object[] decoded = new Object[1]; Modified: incubator/directory/seda/trunk/impl/src/java/org/apache/seda/encoder/DefaultEncoderManager.java ============================================================================== --- incubator/directory/seda/trunk/impl/src/java/org/apache/seda/encoder/DefaultEncoderManager.java (original) +++ incubator/directory/seda/trunk/impl/src/java/org/apache/seda/encoder/DefaultEncoderManager.java Mon Sep 6 23:05:25 2004 @@ -19,22 +19,22 @@ import java.nio.ByteBuffer; import java.util.EventObject; +import java.util.HashMap; import org.apache.commons.codec.EncoderException; -import org.apache.commons.lang.NotImplementedException; - -import org.apache.seda.event.EventRouter; -import org.apache.seda.event.OutputEvent; -import org.apache.seda.event.ResponseEvent; -import org.apache.seda.event.ResponseSubscriber; +import org.apache.commons.codec.stateful.EncoderFactory; +import org.apache.commons.codec.stateful.StatefulEncoder; +import org.apache.commons.codec.stateful.EncoderCallback; import org.apache.seda.stage.StageHandler; import org.apache.seda.stage.DefaultStage; import org.apache.seda.stage.DefaultStageConfig; import org.apache.seda.stage.LoggingStageMonitor; +import org.apache.seda.event.*; import org.apache.seda.listener.ClientKey; -import org.apache.seda.event.AbstractSubscriber; +import org.apache.seda.listener.KeyExpiryException; +import org.apache.seda.protocol.InetServicesDatabase; /** @@ -47,10 +47,22 @@ * @version $Rev$ */ public class DefaultEncoderManager extends DefaultStage - implements EncoderManager, ResponseSubscriber + implements + EncoderManager, + ConnectSubscriber, + ResponseSubscriber, + ProtocolSubscriber, + DisconnectSubscriber { /** the event router used to publish and subscribe to events on */ private final EventRouter router; + /** the internet service database used to lookup protocols by port */ + private final InetServicesDatabase inetdb; + /** a map of protocol names to EncoderFactorys */ + private final HashMap factories = new HashMap( 2 ); + /** map of client keys to client encoders */ + private final HashMap encoders = new HashMap(); + /** the monitor used by this encoder manager */ private EncoderManagerMonitor monitor; @@ -60,13 +72,16 @@ * @param router the event router used to publish and subscribe to events on */ public DefaultEncoderManager( EventRouter router, - DefaultStageConfig config ) + DefaultStageConfig config, + InetServicesDatabase inetdb ) { super( config ); super.setMonitor( new LoggingStageMonitor( this.getClass() ) ); monitor = new EncoderManagerMonitorAdapter(); config.setHandler( new EncoderStageHandler() ); + this.inetdb = inetdb; this.router = router; + this.router.subscribe( ProtocolEvent.class, this ); this.router.subscribe( ResponseEvent.class, this ); } @@ -85,8 +100,20 @@ monitor.failedOnInform( this, event, t ); } } - - + + + /** + * Removes the clients encoder from the map of encoders. + * + * @see org.apache.seda.event.DisconnectSubscriber#inform( + * org.apache.seda.event.DisconnectEvent) + */ + public void inform( DisconnectEvent event ) + { + encoders.remove( event.getClientKey() ); + } + + /* (non-Javadoc) * @see org.apache.seda.event.ResponseSubscriber#inform( * org.apache.seda.event.ResponseEvent) @@ -97,18 +124,122 @@ } - /* (non-Javadoc) - * @see org.apache.seda.encoder.EncoderManager#encode(ClientKey, Object) + public void inform( ProtocolEvent event ) + { + if ( event instanceof AddProtocolEvent ) + { + factories.put( event.getProtocolProvider().getName(), + event.getProtocolProvider().getEncoderFactory() ); + } + } + + + /** + * Temporary place holder for functionality that looks up a protocol + * specific StatefulEncoder. + * + * @param key the client key used to determine associated protocol + * @return the new stateful nonblocking protocol specific encoder + */ + private StatefulEncoder createEncoder( ClientKey key ) + throws KeyExpiryException + { + String proto = inetdb.getProtoByPort( key.getSocket().getLocalPort() ); + EncoderFactory factory = ( EncoderFactory ) factories.get( proto ); + return factory.createEncoder(); + } + + + /** + * We basically create a new client encoder and put it into a map for + * use later when we are processing response events from the client. + * + * @see org.apache.seda.event.ConnectSubscriber#inform( + * org.apache.seda.event.ConnectEvent) */ - public ByteBuffer encode( ClientKey key, Object response ) + public void inform( ConnectEvent event ) + { + StatefulEncoder encoder = null; + ClientKey key = event.getClientKey(); + + try + { + encoder = new ClientEncoder( key, createEncoder( key ) ); + } + catch ( KeyExpiryException e ) + { + monitor.failedOnInform( this, event, e ); + return; + } + + /* + * Here the encoder informs us that a response encoded. + */ + encoder.setCallback( new EncoderCallback() + { + public void encodeOccurred( StatefulEncoder encoder, + Object encoded ) + { + ClientKey key = ( ( ClientEncoder ) encoder ).getClientKey(); + OutputEvent event = new OutputEvent( this, key, + ( ByteBuffer ) encoded ); + router.publish( event ); + } + }); + encoders.put( key, encoder ); + } + + + public ByteBuffer encodeBlocking( ClientKey key, Object response ) throws EncoderException { -// MessageEncoder encoder = new MessageEncoder(); -// return encoder.encode( response ); - throw new NotImplementedException( "need to delegate to encoder" ); + int port = -1; + + try + { + port = key.getSocket().getLocalPort(); + } + catch ( KeyExpiryException e ) + { + monitor.failedOnEncode( this, key, response, e ); + } + + EncoderFactory factory = ( EncoderFactory ) + factories.get( inetdb.getProtoByPort( port ) ); + StatefulEncoder encoder = + new ClientEncoder( key, factory.createEncoder() ); + + // used array to set a value on final variable and get by compiler + final Object[] encoded = new Object[1]; + encoder.setCallback( new EncoderCallback() + { + public void encodeOccurred( StatefulEncoder encoder, Object obj ) + { + encoded[0] = obj; + } + }); + + encoder.encode( response ); + + // the encoded value should be set + if ( encoded[0] == null ) + { + throw new EncoderException( "Expected a complete encoded object" + + " but encoder did not produce one" ); + } + + return ( ByteBuffer ) encoded[0]; + } + + + public void encodeNonBlocking( ClientKey key, Object response ) + throws EncoderException + { + StatefulEncoder encoder = ( StatefulEncoder ) encoders.get( key ); + encoder.encode( response ); } - - + + class EncoderStageHandler implements StageHandler { /* (non-Javadoc) @@ -117,21 +248,18 @@ */ public void handleEvent( EventObject generic ) { - ByteBuffer buf = null; ResponseEvent event = ( ResponseEvent ) generic; - + ClientKey key = event.getClientKey(); + StatefulEncoder encoder = ( StatefulEncoder ) encoders.get( key ); + try { - buf = encode( event.getClientKey(), event.getResponse() ); + encoder.encode( event.getResponse() ); } catch ( EncoderException e ) { monitor.failedOnEncode( DefaultEncoderManager.this, event, e ); } - - OutputEvent outEvent = new OutputEvent( DefaultEncoderManager.this, - event.getClientKey(), buf ); - router.publish( outEvent ); } } } Modified: incubator/directory/seda/trunk/impl/src/test/org/apache/seda/decoder/DefaultDecoderManagerTest.java ============================================================================== --- incubator/directory/seda/trunk/impl/src/test/org/apache/seda/decoder/DefaultDecoderManagerTest.java (original) +++ incubator/directory/seda/trunk/impl/src/test/org/apache/seda/decoder/DefaultDecoderManagerTest.java Mon Sep 6 23:05:25 2004 @@ -18,6 +18,7 @@ import java.util.EventObject ; +import java.util.Iterator; import org.apache.seda.buffer.BufferPool; import org.apache.seda.buffer.BufferPoolConfig; @@ -31,6 +32,8 @@ import org.apache.seda.thread.ThreadPool; import org.apache.seda.stage.DefaultStageConfig; +import org.apache.seda.protocol.InetServicesDatabase; +import org.apache.seda.protocol.InetServiceEntry; import junit.framework.TestCase ; @@ -84,7 +87,34 @@ } ; config = new DefaultStageConfig( "default", tpool ) ; - decodeMan = new DefaultDecoderManager( router, config ) ; + decodeMan = new DefaultDecoderManager( router, config, + new InetServicesDatabase() + { + public Iterator getByName( String name ) + { + return null; + } + + public Iterator getByPort( String port ) + { + return null; + } + + public String getProtoByPort( int port ) + { + return null; + } + + public InetServiceEntry getByName( String name, String proto ) + { + return null; + } + + public InetServiceEntry getByPort( String port, String proto ) + { + return null; + } + }) ; config.setHandler( new DecodeStageHandler( decodeMan ) ) ; decodeMan.start() ; }