Return-Path: Delivered-To: apmail-incubator-directory-cvs-archive@www.apache.org Received: (qmail 95337 invoked from network); 4 Nov 2004 16:26:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur-2.apache.org with SMTP; 4 Nov 2004 16:26:37 -0000 Received: (qmail 5203 invoked by uid 500); 4 Nov 2004 16:26:08 -0000 Delivered-To: apmail-incubator-directory-cvs-archive@incubator.apache.org Received: (qmail 5063 invoked by uid 500); 4 Nov 2004 16:26:07 -0000 Mailing-List: contact directory-cvs-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: directory-dev@incubator.apache.org Delivered-To: mailing list directory-cvs@incubator.apache.org Received: (qmail 5003 invoked by uid 99); 4 Nov 2004 16:26:06 -0000 X-ASF-Spam-Status: No, hits=-10.0 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.28) with SMTP; Thu, 04 Nov 2004 08:26:05 -0800 Received: (qmail 95008 invoked by uid 65534); 4 Nov 2004 16:26:03 -0000 Date: 4 Nov 2004 16:26:03 -0000 Message-ID: <20041104162603.95001.qmail@minotaur.apache.org> From: trustin@apache.org To: directory-cvs@incubator.apache.org Subject: svn commit: rev 56603 - in incubator/directory/seda/trunk/src/java/org/apache/seda: decoder encoder X-Virus-Checked: Checked X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N Author: trustin Date: Thu Nov 4 08:26:01 2004 New Revision: 56603 Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java Log: DefaultEncoderManager and DefaultDecoderManager now works without subscribing to ConnectEvent. Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java ============================================================================== --- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java (original) +++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java Thu Nov 4 08:26:01 2004 @@ -27,8 +27,6 @@ import org.apache.commons.codec.stateful.DecoderCallback; import org.apache.commons.codec.stateful.StatefulDecoder; import org.apache.seda.event.AbstractSubscriber; -import org.apache.seda.event.ConnectEvent; -import org.apache.seda.event.ConnectSubscriber; import org.apache.seda.event.DisconnectEvent; import org.apache.seda.event.DisconnectSubscriber; import org.apache.seda.event.EventRouter; @@ -37,7 +35,6 @@ import org.apache.seda.event.RequestEvent; import org.apache.seda.event.filter.EventTypeFilter; import org.apache.seda.listener.ClientKey; -import org.apache.seda.listener.KeyExpiryException; import org.apache.seda.listener.UDPClientKey; import org.apache.seda.protocol.InetServiceEntry; import org.apache.seda.protocol.InetServicesDatabase; @@ -55,7 +52,7 @@ * @version $Rev$ */ public class DefaultDecoderManager extends DefaultStage - implements DecoderManager, InputSubscriber, ConnectSubscriber, + implements DecoderManager, InputSubscriber, DisconnectSubscriber { /** event router or bus this component subscribes and publishes events on */ @@ -91,7 +88,6 @@ super.setStageMonitor(new LoggingStageMonitor(getClass())); router.subscribe(new EventTypeFilter(InputEvent.class), this); - router.subscribe(new EventTypeFilter(ConnectEvent.class), this); router.subscribe(new EventTypeFilter(DisconnectEvent.class), this); } @@ -142,93 +138,6 @@ decoders.remove(event.getClientKey()); } - /** - * Temporary place holder for functionality that looks up a protocol - * specific StatefulDecoder. - * - * @param key the client key used to determine associated protocol - * @return the new stateful nonblocking protocol specific decoder - */ - private StatefulDecoder createDecoder(ClientKey key) - throws KeyExpiryException - { - TransportTypeEnum transportType; - if (key instanceof UDPClientKey) { - transportType = TransportTypeEnum.UDP; - } else { - transportType = TransportTypeEnum.TCP; - } - - Iterator it = inetdb.getByPort(key.getLocalAddress().getPort()); - ProtocolProvider provider = null; - while (it.hasNext()) { - InetServiceEntry entry = (InetServiceEntry) it.next(); - if (entry.getTransport() == transportType) { - provider = entry.getProtocolProvider(); - } - } - - // TODO replace RuntimeException with ProtocolProviderNotFoundException - if (provider == null) - throw new RuntimeException("No protocol provider available"); - - return provider.getDecoderFactory().createDecoder(); - } - - /** - * We basically create a new client decoder and put it into a map for - * use later when we are processing input events from the client. - * - * @see org.apache.seda.event.ConnectSubscriber#inform( - * org.apache.seda.event.ConnectEvent) - */ - public void inform(ConnectEvent event) - { - StatefulDecoder decoder = null; - ClientKey key = event.getClientKey(); - - try - { - decoder = new ClientDecoder(key, createDecoder(key)); - } - catch (KeyExpiryException e) - { - monitor.failedOnInform(this, event, e); - return; - } - - /* - * Here the decoder informs us that a unit of data is decoded. In the - * case of the snickers decoder we're decoding an LDAP message envelope - * for a request. We use this request to create a RequestEvent and - * publish the event on the queue. - */ - decoder.setCallback(new DecoderCallback() - { - public void decodeOccurred( - StatefulDecoder decoder, - Object decoded) - { - ClientKey key = ((ClientDecoder) decoder).getClientKey(); - RequestEvent event = new RequestEvent(this, key, decoded); - router.publish(event); - } - }); - - /* - * For potential race conditions between ConnectEvent processing and - * the processing of the first InputEvent we synchronize on the decoders - * and notify all when we have altered it. The thread that is waiting - * for a client decoder in the decoders map will wait on the map until - * awoken. - */ - synchronized (decoders) - { - decoders.put(key, decoder); - decoders.notifyAll(); - } - } - // ------------------------------------------------------------------------ // Service Interface Methods // ------------------------------------------------------------------------ @@ -252,16 +161,7 @@ throws DecoderException { // replace this decoder with a real one later - StatefulDecoder decoder = null; - - try - { - decoder = createDecoder(key); - } - catch (KeyExpiryException e) - { - throw new DecoderException("client key has expired"); - } + StatefulDecoder decoder = decoder = createDecoder(key); // used array to set a value on final variable and get by compiler final Object[] decoded = new Object[1]; @@ -318,33 +218,75 @@ */ StatefulDecoder getDecoder(ClientKey key) { + StatefulDecoder decoder = (StatefulDecoder) decoders.get(key); + if (decoder == null) { + synchronized (decoders) { + decoder = (StatefulDecoder) decoders.get(key); + if (decoder == null) { + decoder = createClientDecoder(key); + decoders.put(key, decoder); + } + } + } + + return decoder; + } + + /** + * Temporary place holder for functionality that looks up a protocol + * specific StatefulDecoder. + * + * @param key the client key used to determine associated protocol + * @return the new stateful nonblocking protocol specific decoder + */ + private StatefulDecoder createDecoder(ClientKey key) + { + TransportTypeEnum transportType; + if (key instanceof UDPClientKey) { + transportType = TransportTypeEnum.UDP; + } else { + transportType = TransportTypeEnum.TCP; + } + + Iterator it = inetdb.getByPort(key.getLocalAddress().getPort()); + ProtocolProvider provider = null; + while (it.hasNext()) { + InetServiceEntry entry = (InetServiceEntry) it.next(); + if (entry.getTransport() == transportType) { + provider = entry.getProtocolProvider(); + } + } + + // TODO replace RuntimeException with ProtocolProviderNotFoundException + if (provider == null) + throw new RuntimeException("No protocol provider available"); + + return provider.getDecoderFactory().createDecoder(); + } + + private StatefulDecoder createClientDecoder(ClientKey key) + { StatefulDecoder decoder = null; + decoder = new ClientDecoder(key, createDecoder(key)); + /* - * We synchronize on the decoders map so we can wait for notification - * on it if it does not contain the client decoder. This is in case - * the processing of the connect event is slow and the client decoder - * has not been created yet. When processing of the ConnectEvent is - * complete we are awoken via a notifyAll in the inform() method. + * Here the decoder informs us that a unit of data is decoded. In the + * case of the snickers decoder we're decoding an LDAP message envelope + * for a request. We use this request to create a RequestEvent and + * publish the event on the queue. */ - synchronized (decoders) - { - decoder = (StatefulDecoder) decoders.get(key); - - while (decoder == null) + decoder.setCallback(new DecoderCallback() { - try - { - decoders.wait(); - } - catch (InterruptedException e) + public void decodeOccurred( + StatefulDecoder decoder, + Object decoded) { - e.printStackTrace(); + ClientKey key = ((ClientDecoder) decoder).getClientKey(); + RequestEvent event = new RequestEvent(this, key, decoded); + router.publish(event); } - - decoder = (StatefulDecoder) decoders.get(key); - } - } + }); return decoder; } Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java ============================================================================== --- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java (original) +++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java Thu Nov 4 08:26:01 2004 @@ -27,8 +27,6 @@ import org.apache.commons.codec.stateful.EncoderFactory; import org.apache.commons.codec.stateful.StatefulEncoder; import org.apache.seda.event.AbstractSubscriber; -import org.apache.seda.event.ConnectEvent; -import org.apache.seda.event.ConnectSubscriber; import org.apache.seda.event.DisconnectEvent; import org.apache.seda.event.DisconnectSubscriber; import org.apache.seda.event.EventRouter; @@ -37,7 +35,6 @@ import org.apache.seda.event.ResponseSubscriber; import org.apache.seda.event.filter.EventTypeFilter; import org.apache.seda.listener.ClientKey; -import org.apache.seda.listener.KeyExpiryException; import org.apache.seda.listener.UDPClientKey; import org.apache.seda.protocol.InetServiceEntry; import org.apache.seda.protocol.InetServicesDatabase; @@ -57,7 +54,7 @@ * @version $Rev$ */ public class DefaultEncoderManager extends DefaultStage - implements EncoderManager, ConnectSubscriber, ResponseSubscriber, + implements EncoderManager, ResponseSubscriber, DisconnectSubscriber { /** the event router used to publish and subscribe to events on */ @@ -88,7 +85,6 @@ monitor = new EncoderManagerMonitorAdapter(); this.inetdb = inetdb; this.router = router; - this.router.subscribe(new EventTypeFilter(ConnectEvent.class), this); this.router.subscribe(new EventTypeFilter(ResponseEvent.class), this); } @@ -127,6 +123,66 @@ super.enqueue(event); } + public ByteBuffer encodeBlocking(ClientKey key, Object response) + throws EncoderException + { + int port = key.getLocalAddress().getPort(); + + EncoderFactory factory = + (EncoderFactory) factories.get(inetdb.getProtoByPort(port)); + StatefulEncoder encoder = + new ClientEncoder(key, factory.createEncoder()); + + // used array to set a value on final variable and get by compiler + final Object[] encoded = new Object[1]; + encoder.setCallback(new EncoderCallback() + { + public void encodeOccurred(StatefulEncoder encoder, Object obj) + { + encoded[0] = obj; + } + }); + + encoder.encode(response); + + // the encoded value should be set + if (encoded[0] == null) + { + throw new EncoderException("Expected a complete encoded object" + + " but encoder did not produce one"); + } + + return (ByteBuffer) encoded[0]; + } + + public void encodeNonBlocking(ClientKey key, Object response) + throws EncoderException + { + StatefulEncoder encoder = (StatefulEncoder) encoders.get(key); + encoder.encode(response); + } + + EncoderManagerMonitor getMonitor() + { + return monitor; + } + + StatefulEncoder getEncoder(ClientKey key) + { + StatefulEncoder encoder = (StatefulEncoder) encoders.get(key); + if (encoder == null) { + synchronized (encoders) { + encoder = (StatefulEncoder) encoders.get(key); + if (encoder == null) { + encoder = createClientEncoder(key); + encoders.put(key, encoder); + } + } + } + + return encoder; + } + /** * Temporary place holder for functionality that looks up a protocol * specific StatefulEncoder. @@ -135,7 +191,6 @@ * @return the new stateful nonblocking protocol specific encoder */ private StatefulEncoder createEncoder(ClientKey key) - throws KeyExpiryException { TransportTypeEnum transportType; if (key instanceof UDPClientKey) { @@ -167,20 +222,9 @@ * @see org.apache.seda.event.ConnectSubscriber#inform( * org.apache.seda.event.ConnectEvent) */ - public void inform(ConnectEvent event) + private StatefulEncoder createClientEncoder(ClientKey key) { - StatefulEncoder encoder = null; - ClientKey key = event.getClientKey(); - - try - { - encoder = new ClientEncoder(key, createEncoder(key)); - } - catch (KeyExpiryException e) - { - monitor.failedOnInform(this, event, e); - return; - } + StatefulEncoder encoder = new ClientEncoder(key, createEncoder(key)); /* * Here the encoder informs us that a response encoded. @@ -196,55 +240,7 @@ router.publish(event); } }); - encoders.put(key, encoder); - } - - public ByteBuffer encodeBlocking(ClientKey key, Object response) - throws EncoderException - { - int port = key.getLocalAddress().getPort(); - - EncoderFactory factory = - (EncoderFactory) factories.get(inetdb.getProtoByPort(port)); - StatefulEncoder encoder = - new ClientEncoder(key, factory.createEncoder()); - - // used array to set a value on final variable and get by compiler - final Object[] encoded = new Object[1]; - encoder.setCallback(new EncoderCallback() - { - public void encodeOccurred(StatefulEncoder encoder, Object obj) - { - encoded[0] = obj; - } - }); - - encoder.encode(response); - - // the encoded value should be set - if (encoded[0] == null) - { - throw new EncoderException("Expected a complete encoded object" + - " but encoder did not produce one"); - } - - return (ByteBuffer) encoded[0]; - } - - public void encodeNonBlocking(ClientKey key, Object response) - throws EncoderException - { - StatefulEncoder encoder = (StatefulEncoder) encoders.get(key); - encoder.encode(response); - } - - EncoderManagerMonitor getMonitor() - { - return monitor; - } - - StatefulEncoder getEncoder(ClientKey key) - { - return (StatefulEncoder) encoders.get(key); + + return encoder; } } Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java ============================================================================== --- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java (original) +++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java Thu Nov 4 08:26:01 2004 @@ -50,11 +50,7 @@ { ResponseEvent re = (ResponseEvent) event; ClientKey key = re.getClientKey(); - StatefulEncoder encoder; - - // FIXME Event synchronization issues - while ((encoder = encMan.getEncoder(key)) == null) - continue; + StatefulEncoder encoder = encMan.getEncoder(key); try {