Return-Path: Delivered-To: apmail-incubator-cxf-commits-archive@locus.apache.org Received: (qmail 73653 invoked from network); 27 Mar 2007 17:42:41 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 27 Mar 2007 17:42:41 -0000 Received: (qmail 59286 invoked by uid 500); 27 Mar 2007 17:42:48 -0000 Delivered-To: apmail-incubator-cxf-commits-archive@incubator.apache.org Received: (qmail 59189 invoked by uid 500); 27 Mar 2007 17:42:48 -0000 Mailing-List: contact cxf-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cxf-dev@incubator.apache.org Delivered-To: mailing list cxf-commits@incubator.apache.org Received: (qmail 59176 invoked by uid 99); 27 Mar 2007 17:42:47 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Mar 2007 10:42:47 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Mar 2007 10:42:39 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 897B81A983A; Tue, 27 Mar 2007 10:42:19 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r523011 - in /incubator/cxf/trunk/rt/transports/local/src: main/java/org/apache/cxf/transport/local/ test/java/org/apache/cxf/transport/local/ Date: Tue, 27 Mar 2007 17:42:19 -0000 To: cxf-commits@incubator.apache.org From: dandiep@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070327174219.897B81A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dandiep Date: Tue Mar 27 10:42:18 2007 New Revision: 523011 URL: http://svn.apache.org/viewvc?view=rev&rev=523011 Log: Allow people to directly invoke a Destination, instead of forcing them to go throughan OutputStream. Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?view=diff&rev=523011&r1=523010&r2=523011 ============================================================================== --- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java (original) +++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java Tue Mar 27 10:42:18 2007 @@ -37,7 +37,9 @@ public class LocalConduit extends AbstractConduit { public static final String IN_CONDUIT = LocalConduit.class.getName() + ".inConduit"; + public static final String RESPONSE_CONDUIT = LocalConduit.class.getName() + ".inConduit"; public static final String IN_EXCHANGE = LocalConduit.class.getName() + ".inExchange"; + public static final String DIRECT_DISPATCH = LocalConduit.class.getName() + ".directDispatch"; private static final Logger LOG = LogUtils.getL7dLogger(LocalConduit.class); @@ -49,6 +51,30 @@ } public void send(final Message message) throws IOException { + if (Boolean.TRUE.equals(message.get(DIRECT_DISPATCH))) { + dispatchDirect(message); + } else { + dispatchViaPipe(message); + } + } + + private void dispatchDirect(Message message) { + if (destination.getMessageObserver() == null) { + throw new IllegalStateException("Local destination does not have a MessageObserver on address " + + destination.getAddress().getAddress().getValue()); + } + + message.put(IN_CONDUIT, this); + Exchange exchange = message.getExchange(); + if (exchange == null) { + exchange = new ExchangeImpl(); + exchange.setInMessage(message); + } + exchange.setDestination(destination); + destination.getMessageObserver().onMessage(message); + } + + private void dispatchViaPipe(final Message message) throws IOException { final PipedInputStream stream = new PipedInputStream(); final LocalConduit conduit = this; final Exchange exchange = message.getExchange(); @@ -60,15 +86,15 @@ final Runnable receiver = new Runnable() { public void run() { - MessageImpl m = new MessageImpl(); - m.setContent(InputStream.class, stream); - m.setDestination(destination); - m.put(IN_CONDUIT, conduit); + MessageImpl inMsg = new MessageImpl(); + inMsg.setContent(InputStream.class, stream); + inMsg.setDestination(destination); + inMsg.put(IN_CONDUIT, conduit); ExchangeImpl ex = new ExchangeImpl(); - ex.setInMessage(m); + ex.setInMessage(inMsg); ex.put(IN_EXCHANGE, exchange); - destination.getMessageObserver().onMessage(m); + destination.getMessageObserver().onMessage(inMsg); } }; Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java?view=diff&rev=523011&r1=523010&r2=523011 ============================================================================== --- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java (original) +++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java Tue Mar 27 10:42:18 2007 @@ -41,9 +41,12 @@ implements DestinationFactory, ConduitInitiator { public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/local"; - + + public static final String DISPATCH_DIRECT = "dispatch.direct"; + private static final Logger LOG = Logger.getLogger(LocalTransportFactory.class.getName()); private static final Set URI_PREFIXES = new HashSet(); + static { URI_PREFIXES.add("local://"); } Modified: incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java?view=diff&rev=523011&r1=523010&r2=523011 ============================================================================== --- incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java (original) +++ incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java Tue Mar 27 10:42:18 2007 @@ -19,6 +19,7 @@ package org.apache.cxf.transport.local; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -32,9 +33,11 @@ import org.apache.cxf.transport.Conduit; import org.apache.cxf.transport.Destination; import org.apache.cxf.transport.MessageObserver; +import org.junit.Test; import org.xmlsoap.schemas.wsdl.http.AddressType; public class LocalTransportFactoryTest extends TestCase { + @Test public void testTransportFactory() throws Exception { LocalTransportFactory factory = new LocalTransportFactory(); @@ -62,12 +65,39 @@ assertEquals("hello", obs.getResponseStream().toString()); } + @Test + public void testDirectInvocation() throws Exception { + LocalTransportFactory factory = new LocalTransportFactory(); + + EndpointInfo ei = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http"); + AddressType a = new AddressType(); + a.setLocation("http://localhost/test"); + ei.addExtensor(a); + + LocalDestination d = (LocalDestination) factory.getDestination(ei); + d.setMessageObserver(new EchoObserver()); + + // Set up a listener for the response + Conduit conduit = factory.getConduit(ei); + TestMessageObserver obs = new TestMessageObserver(); + conduit.setMessageObserver(obs); + + MessageImpl m = new MessageImpl(); + m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE); + m.setDestination(d); + m.setContent(InputStream.class, new ByteArrayInputStream("hello".getBytes())); + conduit.send(m); + + assertEquals("hello", obs.getResponseStream().toString()); + + } static class EchoObserver implements MessageObserver { public void onMessage(Message message) { try { Conduit backChannel = message.getDestination().getBackChannel(message, null, null); - + message.remove(LocalConduit.DIRECT_DISPATCH); + backChannel.send(message); OutputStream out = message.getContent(OutputStream.class); @@ -116,6 +146,7 @@ public synchronized void onMessage(Message message) { try { + message.remove(LocalConduit.DIRECT_DISPATCH); copy(message.getContent(InputStream.class), response, 1024); } catch (IOException e) { e.printStackTrace();