cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r639047 - in /incubator/cxf/trunk: rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ rt/transports/local/src/main/java/org/apache/cxf/transport/local/ systests/sr...
Date Wed, 19 Mar 2008 22:45:03 GMT
Author: dkulp
Date: Wed Mar 19 15:44:58 2008
New Revision: 639047

URL: http://svn.apache.org/viewvc?rev=639047&view=rev
Log:
[CXF-1398] Fixes to get the local transport working with MTOM stuff.

Modified:
    incubator/cxf/trunk/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
    incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
    incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom_feature/MtomFeatureClientServerTest.java

Modified: incubator/cxf/trunk/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
(original)
+++ incubator/cxf/trunk/rt/bindings/object/src/main/java/org/apache/cxf/binding/object/ObjectDispatchOutInterceptor.java
Wed Mar 19 15:44:58 2008
@@ -18,6 +18,9 @@
  */
 package org.apache.cxf.binding.object;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -25,16 +28,22 @@
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.service.model.BindingOperationInfo;
 import org.apache.cxf.transport.local.LocalConduit;
+import org.apache.cxf.transport.local.LocalTransportFactory;
 
 public class ObjectDispatchOutInterceptor extends AbstractPhaseInterceptor<Message>
{
-
+    private Set<String> includes = new HashSet<String>();
+    
     public ObjectDispatchOutInterceptor() {
         super(Phase.SETUP);
+        includes.add(ObjectBinding.OPERATION);
+        includes.add(ObjectBinding.BINDING);
     }
 
     public void handleMessage(Message message) throws Fault {
         Exchange ex = message.getExchange();
         message.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+        message.put(LocalTransportFactory.MESSAGE_INCLUDE_PROPERTIES,
+                    includes);
         BindingOperationInfo bop = ex.get(BindingOperationInfo.class);
         
         message.put(ObjectBinding.OPERATION, bop.getName());

Modified: incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
(original)
+++ incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/LocalServerRegistrationTest.java
Wed Mar 19 15:44:58 2008
@@ -39,7 +39,6 @@
 import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.ConduitInitiatorManager;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.local.LocalConduit;
 import org.apache.cxf.transport.local.LocalTransportFactory;
 import org.junit.Test;
 
@@ -67,19 +66,20 @@
         BindingInfo bi = serviceInfo.getBindings().iterator().next();
         BindingOperationInfo bop = bi.getOperations().iterator().next();
 
-        assertNotNull(bop.getOperationInfo());
-
+        assertNotNull(bop.getOperationInfo());       
+        
         MessageImpl m = new MessageImpl();
         m.setContent(List.class, content);
-        m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
-        m.put(ObjectBinding.BINDING, bop.getBinding().getName());
-        m.put(ObjectBinding.OPERATION, bop.getName());
-
         ExchangeImpl ex = new ExchangeImpl();
         ex.setInMessage(m);
+        ex.put(BindingOperationInfo.class, bop);
 
         Conduit c = getLocalConduit("local://" + server);
         ex.setConduit(c);
+        
+        new ObjectDispatchOutInterceptor().handleMessage(m);
+        
+
 
         c.setMessageObserver(new MessageObserver() {
             public void onMessage(Message message) {

Modified: incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
(original)
+++ incubator/cxf/trunk/rt/bindings/object/src/test/java/org/apache/cxf/binding/object/ObjectBindingTest.java
Wed Mar 19 15:44:58 2008
@@ -67,16 +67,19 @@
 
         assertNotNull(bop.getOperationInfo());
 
+        
         MessageImpl m = new MessageImpl();
         m.setContent(List.class, content);
-        m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
-        m.put(ObjectBinding.BINDING, bop.getBinding().getName());
-        m.put(ObjectBinding.OPERATION, bop.getName());
-
         ExchangeImpl ex = new ExchangeImpl();
         ex.setInMessage(m);
+        ex.put(BindingOperationInfo.class, bop);
 
         Conduit c = getLocalConduit("local://Echo");
+        ex.setConduit(c);
+        
+        new ObjectDispatchOutInterceptor().handleMessage(m);
+
+
         ex.setConduit(c);
 
         c.setMessageObserver(new MessageObserver() {

Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
(original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
Wed Mar 19 15:44:58 2008
@@ -24,12 +24,10 @@
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.Set;
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.AbstractWrappedOutputStream;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -43,7 +41,7 @@
     public static final String RESPONSE_CONDUIT = LocalConduit.class.getName() + ".inConduit";
     public static final String IN_EXCHANGE = LocalConduit.class.getName() + ".inExchange";
     public static final String DIRECT_DISPATCH = LocalConduit.class.getName() + ".directDispatch";
-    public static final String MESSAGE_FILTER_PROPERTIES = LocalConduit.class.getName() +
".filterProperties";
+    public static final String MESSAGE_FILTER_PROPERTIES = LocalTransportFactory.MESSAGE_FILTER_PROPERTIES;
 
     private static final Logger LOG = LogUtils.getL7dLogger(LocalConduit.class);
     
@@ -86,7 +84,8 @@
         copy.put(IN_CONDUIT, this);
         copy.setDestination(destination);
         
-        copy(message, copy, transportFactory.getMessageFilterProperties());
+        transportFactory.copy(message, copy);
+        MessageImpl.copyContent(message, copy);
         
         CachedOutputStream stream = (CachedOutputStream)message.getContent(OutputStream.class);
         copy.setContent(InputStream.class, stream.getInputStream());
@@ -101,24 +100,8 @@
         destination.getMessageObserver().onMessage(copy);
     }
 
-    public static void copy(Message message, MessageImpl copy, Set<String> defaultFilter)
{
-        Set<String> filter = CastUtils.cast((Set)message.get(MESSAGE_FILTER_PROPERTIES));
-        if (filter == null) {
-            filter = defaultFilter;
-        }
-        
-        // copy all the contents
-        for (Map.Entry<String, Object> e : message.entrySet()) {
-            if (!filter.contains(e.getKey())) {
-                copy.put(e.getKey(), e.getValue());
-            }
-        }
-        
-        MessageImpl.copyContent(message, copy);
-    }
 
     private void dispatchViaPipe(final Message message) throws IOException {
-        final PipedInputStream stream = new PipedInputStream();
         final LocalConduit conduit = this;
         final Exchange exchange = message.getExchange();
         
@@ -127,24 +110,32 @@
                                             + destination.getAddress().getAddress().getValue());
         }
         
-        final Runnable receiver = new Runnable() {
-            public void run() {
-                MessageImpl inMsg = new MessageImpl();
-                inMsg.setContent(InputStream.class, stream);
-                inMsg.setDestination(destination);
-                inMsg.put(IN_CONDUIT, conduit);
-                
-                ExchangeImpl ex = new ExchangeImpl();
-                ex.setInMessage(inMsg);
-                ex.put(IN_EXCHANGE, exchange);
-                destination.getMessageObserver().onMessage(inMsg);
-            }
-        };
-
-        message.setContent(OutputStream.class, new PipedOutputStream(stream));
-
-        // TODO: put on executor
-        new Thread(receiver).start();
+        
+        AbstractWrappedOutputStream cout 
+            = new AbstractWrappedOutputStream() {
+                protected void onFirstWrite() throws IOException {
+                    final PipedInputStream stream = new PipedInputStream();
+                    wrappedStream = new PipedOutputStream(stream);
+
+                    final Runnable receiver = new Runnable() {
+                        public void run() {
+                            MessageImpl inMsg = new MessageImpl();
+                            transportFactory.copy(message, inMsg); 
+                            inMsg.setContent(InputStream.class, stream);
+                            inMsg.setDestination(destination);
+                            inMsg.put(IN_CONDUIT, conduit);
+                            
+                            ExchangeImpl ex = new ExchangeImpl();
+                            ex.setInMessage(inMsg);
+                            ex.put(IN_EXCHANGE, exchange);
+                            destination.getMessageObserver().onMessage(inMsg);
+                        }
+                    };
+                    
+                    new Thread(receiver).start();
+                }
+            };
+        message.setContent(OutputStream.class, cout);
     }
     
     protected Logger getLogger() {

Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
(original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
Wed Mar 19 15:44:58 2008
@@ -27,6 +27,7 @@
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractWrappedOutputStream;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -66,7 +67,7 @@
         return null;
     }
 
-    static class SynchronousConduit extends AbstractConduit {
+    class SynchronousConduit extends AbstractConduit {
         private LocalConduit conduit;
 
         public SynchronousConduit(LocalConduit conduit) {
@@ -77,22 +78,31 @@
         public void prepare(final Message message) throws IOException {            
             if (!Boolean.TRUE.equals(message.getExchange().get(LocalConduit.DIRECT_DISPATCH)))
{
                 final Exchange exchange = (Exchange)message.getExchange().get(LocalConduit.IN_EXCHANGE);
-                
-                final PipedInputStream stream = new PipedInputStream();
-                final Runnable receiver = new Runnable() {
-                    public void run() {
-                        MessageImpl m = new MessageImpl();
-                        if (exchange != null) {
-                            exchange.setInMessage(m);
-                        }
-                        m.setContent(InputStream.class, stream);
-                        conduit.getMessageObserver().onMessage(m);
-                    }
-                };
+
+                AbstractWrappedOutputStream cout 
+                    = new AbstractWrappedOutputStream() {
+                        protected void onFirstWrite() throws IOException {
+                            final PipedInputStream stream = new PipedInputStream();
+                            wrappedStream = new PipedOutputStream(stream);
     
-                PipedOutputStream outStream = new PipedOutputStream(stream);
-                message.setContent(OutputStream.class, outStream);    
-                new Thread(receiver).start();
+                            final Runnable receiver = new Runnable() {
+                                public void run() {
+                                    MessageImpl m = new MessageImpl();
+                                    localDestinationFactory.copy(message, m);
+                                    
+                                    if (exchange != null) {
+                                        exchange.setInMessage(m);
+                                    }
+                                    m.setContent(InputStream.class, stream);
+                                    conduit.getMessageObserver().onMessage(m);
+                                }
+                            };
+                            
+                            new Thread(receiver).start();
+                        }
+                    };
+                
+                message.setContent(OutputStream.class, cout);    
                 
             } else {
                 CachedOutputStream stream = new CachedOutputStream();

Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
(original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
Wed Mar 19 15:44:58 2008
@@ -32,6 +32,7 @@
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractTransportFactory;
@@ -46,6 +47,10 @@
     implements DestinationFactory, ConduitInitiator {
    
     public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/local";
+    public static final String MESSAGE_FILTER_PROPERTIES 
+        = LocalTransportFactory.class.getName() + ".filterProperties";
+    public static final String MESSAGE_INCLUDE_PROPERTIES 
+        = LocalTransportFactory.class.getName() + ".includeProperties";
 
     private static final Logger LOG = LogUtils.getL7dLogger(LocalTransportFactory.class);
     private static final Set<String> URI_PREFIXES = new HashSet<String>();
@@ -58,6 +63,7 @@
     private Bus bus;
 
     private Set<String> messageFilterProperties;
+    private Set<String> messageIncludeProperties;
     
     public LocalTransportFactory() {
         super();
@@ -66,7 +72,14 @@
         setTransportIds(ids);
         
         messageFilterProperties = new HashSet<String>();
-        messageFilterProperties.add(Message.REQUESTOR_ROLE);        
+        messageIncludeProperties = new HashSet<String>();
+        messageFilterProperties.add(Message.REQUESTOR_ROLE); 
+        
+        messageIncludeProperties.add(Message.PROTOCOL_HEADERS);
+        messageIncludeProperties.add(Message.ENCODING);
+        messageIncludeProperties.add(Message.CONTENT_TYPE);
+        messageIncludeProperties.add(Message.ACCEPT_CONTENT_TYPE);
+        messageIncludeProperties.add(Message.RESPONSE_CODE);
     }
     
     @Resource(name = "bus")
@@ -126,8 +139,36 @@
         return messageFilterProperties;
     }
 
-    public void setMessageFilterProperties(Set<String> messageFilterProperties) {
-        this.messageFilterProperties = messageFilterProperties;
+    public void setMessageFilterProperties(Set<String> props) {
+        this.messageFilterProperties = props;
+    }
+    public Set<String> getIncludeMessageProperties() {
+        return messageIncludeProperties;
     }
 
+    public void setMessageIncludeProperties(Set<String> props) {
+        this.messageIncludeProperties = props;
+    }
+
+    
+    public void copy(Message message, Message copy) {
+        Set<String> filter = CastUtils.cast((Set)message.get(MESSAGE_FILTER_PROPERTIES));
+        if (filter == null) {
+            filter = messageFilterProperties;
+        }
+        
+        Set<String> includes =  CastUtils.cast((Set)message.get(MESSAGE_INCLUDE_PROPERTIES));
+        if (includes == null) {
+            includes = messageIncludeProperties;
+        }
+
+        // copy all the contents
+        for (Map.Entry<String, Object> e : message.entrySet()) {
+            if ((includes.contains(e.getKey())
+                || messageIncludeProperties.contains(e.getKey()))
+                && !filter.contains(e.getKey())) {
+                copy.put(e.getKey(), e.getValue());
+            }
+        }
+    }    
 }

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom_feature/MtomFeatureClientServerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom_feature/MtomFeatureClientServerTest.java?rev=639047&r1=639046&r2=639047&view=diff
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom_feature/MtomFeatureClientServerTest.java
(original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom_feature/MtomFeatureClientServerTest.java
Wed Mar 19 15:44:58 2008
@@ -23,10 +23,14 @@
 import java.net.URL;
 import javax.imageio.ImageIO;
 import javax.xml.namespace.QName;
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Endpoint;
 import javax.xml.ws.Holder;
+import javax.xml.ws.Service;
 import javax.xml.ws.soap.MTOMFeature;
 
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.local.LocalConduit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -56,6 +60,45 @@
         Holder<byte[]> image = new Holder<byte[]>(bytes);
         port.echoData(image);
         assertNotNull(image);
+    }
+    
+    @Test
+    public void testWithLocalTransport() throws Exception {
+        Object implementor = new HelloImpl();
+        String address = "local://Hello";
+        Endpoint.publish(address, implementor);
+        QName portName = new QName("http://apache.org/cxf/systest/mtom_feature",
+                                   "HelloPort");
+        
+        Service service = Service.create(serviceName);
+        service.addPort(portName, 
+                        "http://schemas.xmlsoap.org/soap/", 
+                        "local://Hello");
+        port = service.getPort(portName,
+                               Hello.class,
+                               new MTOMFeature());
+
+        
+
+        ((BindingProvider)port).getRequestContext()
+            .put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, address);
+        
+        ((BindingProvider)port).getRequestContext()
+            .put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+        Holder<byte[]> photo = new Holder<byte[]>("CXF".getBytes());
+        Holder<Image> image = new Holder<Image>(getImage("/java.jpg"));
+        port.detail(photo, image);
+        assertEquals("CXF", new String(photo.value));
+        assertNotNull(image.value);
+        
+        
+        ((BindingProvider)port).getRequestContext()
+            .put(LocalConduit.DIRECT_DISPATCH, Boolean.FALSE);
+        photo = new Holder<byte[]>("CXF".getBytes());
+        image = new Holder<Image>(getImage("/java.jpg"));
+        port.detail(photo, image);
+        assertEquals("CXF", new String(photo.value));
+        assertNotNull(image.value);
     }
 
     private Image getImage(String name) throws Exception {



Mime
View raw message