cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ff...@apache.org
Subject svn commit: r468334 - /incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/
Date Fri, 27 Oct 2006 09:48:03 GMT
Author: ffang
Date: Fri Oct 27 02:48:01 2006
New Revision: 468334

URL: http://svn.apache.org/viewvc?view=rev&rev=468334
Log:
[CXF-38] JBI Integration -- JBI transport implementation

Added:
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
  (with props)
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
  (with props)
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
  (with props)
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
  (with props)
Modified:
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java

Modified: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java?view=diff&rev=468334&r1=468333&r2=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
(original)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
Fri Oct 27 02:48:01 2006
@@ -20,43 +20,68 @@
 package org.apache.cxf.jbi.transport;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import javax.jbi.messaging.DeliveryChannel;
+
+
+
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
+
 public class JBIConduit implements Conduit {
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(JBIConduit.class);
+       
+    private MessageObserver incomingObserver;
+    private EndpointReferenceType target;
+    private DeliveryChannel channel;
+           
+    
+    
+    public JBIConduit(EndpointReferenceType target, DeliveryChannel dc) {           
+        this.target = target;
+        channel = dc;
+    }
 
     public void send(Message message) throws IOException {
-        // TODO Auto-generated method stub
-
+        LOG.log(Level.FINE, "JBIConduit send message");
+                
+        message.setContent(OutputStream.class,
+                           new JBIConduitOutputStream(message, channel, target, this));
     }
 
     public void close(Message message) throws IOException {
-        // TODO Auto-generated method stub
-
+        message.getContent(OutputStream.class).close();        
     }
 
     public EndpointReferenceType getTarget() {
-        // TODO Auto-generated method stub
-        return null;
+        return target;
     }
 
     public Destination getBackChannel() {
-        // TODO Auto-generated method stub
         return null;
     }
 
     public void close() {
-        // TODO Auto-generated method stub
-
+        
     }
 
     public void setMessageObserver(MessageObserver observer) {
-        // TODO Auto-generated method stub
-
+        incomingObserver = observer;     
+    }
+    
+    public MessageObserver getMessageObserver() {
+        return incomingObserver;
     }
 
+    
+     
 }

Added: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java?view=auto&rev=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
(added)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jws.WebService;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
+
+public class JBIConduitOutputStream extends AbstractCachedOutputStream {
+
+    private static final Logger LOG = LogUtils.getL7dLogger(JBIConduitOutputStream.class);
+
+    private Message message;
+    private boolean isOneWay;
+    private DeliveryChannel channel;
+    private JBIConduit conduit;
+    private EndpointReferenceType target;
+
+    public JBIConduitOutputStream(Message m, DeliveryChannel channel, EndpointReferenceType
target,
+                                  JBIConduit conduit) {
+        message = m;
+        this.channel = channel;
+        this.conduit = conduit;
+        this.target = target;
+    }
+
+    @Override
+    protected void doFlush() throws IOException {
+
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        isOneWay = message.getExchange().isOneWay();
+        commitOutputMessage();
+
+    }
+
+    private void commitOutputMessage() throws IOException {
+        try {
+            Method targetMethod = (Method)message.get(Method.class.getName());
+            Class<?> clz = targetMethod.getDeclaringClass();
+
+            LOG.info(new org.apache.cxf.common.i18n.Message("INVOKE.SERVICE", LOG).toString()
+ clz);
+
+            WebService ws = clz.getAnnotation(WebService.class);
+            assert ws != null;
+            QName interfaceName = new QName(ws.targetNamespace(), ws.name());
+            QName serviceName = EndpointReferenceUtils.getServiceName(target);
+            MessageExchangeFactory factory = channel.createExchangeFactoryForService(serviceName);
+            LOG.info(new org.apache.cxf.common.i18n.Message("CREATE.MESSAGE.EXCHANGE", LOG).toString()
+                     + serviceName);
+            MessageExchange xchng = null;
+            if (isOneWay) {
+                xchng = factory.createInOnlyExchange();
+            } else {
+                xchng = factory.createInOutExchange();
+            }
+
+            NormalizedMessage inMsg = xchng.createMessage();
+            LOG.info(new org.apache.cxf.common.i18n.Message("EXCHANGE.ENDPOINT", LOG).toString()
+                     + xchng.getEndpoint());
+
+            InputStream ins = null;
+
+            if (inMsg != null) {
+                LOG.info("setup message contents on " + inMsg);
+                inMsg.setContent(getMessageContent(message));
+                xchng.setService(serviceName);
+                LOG.info("service for exchange " + serviceName);
+
+                xchng.setInterfaceName(interfaceName);
+
+                xchng.setOperation(new QName(targetMethod.getName()));
+                if (isOneWay) {
+                    ((InOnly)xchng).setInMessage(inMsg);
+                } else {
+                    ((InOut)xchng).setInMessage(inMsg);
+                }
+                LOG.info("sending message");
+                if (!isOneWay) {
+
+                    channel.sendSync(xchng);
+                    NormalizedMessage outMsg = ((InOut)xchng).getOutMessage();
+                    ins = JBIMessageHelper.convertMessageToInputStream(outMsg.getContent());
+                    if (ins == null) {
+                        throw new IOException(new org.apache.cxf.common.i18n.Message(
+                            "UNABLE.RETRIEVE.MESSAGE", LOG).toString());
+                    }
+                    Message inMessage = new MessageImpl();
+                    message.getExchange().setInMessage(inMessage);
+                    inMessage.setContent(InputStream.class, ins);
+                    conduit.getMessageObserver().onMessage(inMessage);
+                    
+                } else {
+                    channel.send(xchng);
+                }
+
+            } else {
+                LOG.info(new org.apache.cxf.common.i18n.Message("NO.MESSAGE", LOG).toString());
+            }
+
+            
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new IOException(e.toString());
+        }
+    }
+
+    private Source getMessageContent(Message message2) {
+        ByteArrayOutputStream bos = (ByteArrayOutputStream)getOut();
+        return new StreamSource(new ByteArrayInputStream(bos.toByteArray()));
+        
+    }
+
+    @Override
+    protected void onWrite() throws IOException {
+
+    }
+
+}

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduitOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java?view=auto&rev=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
(added)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+public final class JBIConstants {
+    public static final String MESSAGE_EXCHANGE_PROPERTY = "celtix.jbi.message.exchange";
+}

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConstants.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java?view=diff&rev=468334&r1=468333&r2=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
(original)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
Fri Oct 27 02:48:01 2006
@@ -20,34 +20,252 @@
 package org.apache.cxf.jbi.transport;
 
 import java.io.IOException;
-
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jbi.se.CXFServiceUnit;
+import org.apache.cxf.jbi.se.CXFServiceUnitManager;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
 public class JBIDestination implements Destination {
 
+    private static final Logger LOG = LogUtils.getL7dLogger(JBIDestination.class);
+    private final DeliveryChannel channel;
+    private final CXFServiceUnitManager suManager; 
+    private ConduitInitiator conduitInitiator;
+    private EndpointInfo endpointInfo;
+    private EndpointReferenceType reference;
+    private MessageObserver incomingObserver;
+    private JBIDispatcher dispatcher;
+    private volatile boolean running; 
+    
+    public JBIDestination(ConduitInitiator ci,
+                          EndpointInfo info,
+                          DeliveryChannel dc,
+                          CXFServiceUnitManager sum) {
+        this.conduitInitiator = ci;
+        this.endpointInfo = info;
+        this.channel = dc;
+        this.suManager = sum;
+        reference = new EndpointReferenceType();
+        AttributedURIType address = new AttributedURIType();
+        address.setValue(endpointInfo.getAddress());
+        reference.setAddress(address);        
+    }
+    
     public EndpointReferenceType getAddress() {
-        // TODO Auto-generated method stub
-        return null;
+        return reference;
     }
 
     public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType
address)
         throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        Conduit backChannel = null;
+        if (address == null) {
+            backChannel = new BackChannelConduit(address, inMessage, this);
+        } else {
+            if (partialResponse != null) {
+                // just send back the partialResponse 
+                backChannel = new BackChannelConduit(address, inMessage , this);
+            } else {                
+                backChannel = conduitInitiator.getConduit(endpointInfo, address);
+                // ensure decoupled back channel input stream is closed
+                backChannel.setMessageObserver(new MessageObserver() {
+                    public void onMessage(Message m) {
+                        //need to set up the headers 
+                        if (m.getContentFormats().contains(InputStream.class)) {
+                            InputStream is = m.getContent(InputStream.class);
+                            try {
+                                is.close();
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
+                    }
+                });
+            }
+        }
+        return backChannel;
+
     }
 
     public void shutdown() {
-        // TODO Auto-generated method stub
-
+        running = false;
     }
 
     public void setMessageObserver(MessageObserver observer) {
-        // TODO Auto-generated method stub
+        if (null != observer) {
+            try {
+                activate();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            LOG.log(Level.FINE, "JBIDestination shutdown()");
+            try {
+                deactivate();
+            } catch (IOException e) {
+                //Ignore for now.
+            }
+        }
+        incomingObserver = observer;
 
     }
 
+    private void deactivate() throws IOException {
+        running = false;
+    }
+
+    private void activate() throws IOException {
+        LOG.info(new org.apache.cxf.common.i18n.Message(
+            "ACTIVE.JBI.SERVER.TRANSPORT", LOG).toString());
+        dispatcher = new JBIDispatcher();
+        new Thread(dispatcher).start();
+    }
+    
+    // this should deal with the cxf message 
+    protected class BackChannelConduit implements Conduit {
+        
+        protected Message inMessage;
+        protected EndpointReferenceType target;
+        protected JBIDestination jbiDestination;
+                
+        BackChannelConduit(EndpointReferenceType ref, Message message, JBIDestination dest)
{
+            inMessage = message;
+            target = ref;
+            jbiDestination = dest;
+        }
+        
+        public void close(Message msg) throws IOException {
+            msg.getContent(OutputStream.class).close();        
+        }
+
+        /**
+         * Register a message observer for incoming messages.
+         * 
+         * @param observer the observer to notify on receipt of incoming
+         */
+        public void setMessageObserver(MessageObserver observer) {
+            // shouldn't be called for a back channel conduit
+        }
+
+        /**
+         * Send an outbound message, assumed to contain all the name-value
+         * mappings of the corresponding input message (if any). 
+         * 
+         * @param message the message to be sent.
+         */
+        public void send(Message message) throws IOException {
+            // setup the message to be send back
+            message.put(JBIConstants.MESSAGE_EXCHANGE_PROPERTY, 
+                inMessage.get(JBIConstants.MESSAGE_EXCHANGE_PROPERTY));
+            message.setContent(OutputStream.class,
+                               new JBIDestinationOutputStream(inMessage, channel));
+        }
+        
+        /**
+         * @return the reference associated with the target Destination
+         */    
+        public EndpointReferenceType getTarget() {
+            return target;
+        }
+        
+        /**
+         * Retreive the back-channel Destination.
+         * 
+         * @return the backchannel Destination (or null if the backchannel is
+         * built-in)
+         */
+        public Destination getBackChannel() {
+            return null;
+        }
+        
+        /**
+         * Close the conduit
+         */
+        public void close() {
+        }
+    }
+
+    private class JBIDispatcher implements Runnable { 
+        
+        public final void run() {
+            
+            try { 
+                running = true;
+                LOG.info(new org.apache.cxf.common.i18n.Message(
+                    "RECEIVE.THREAD.START", LOG).toString());
+                do { 
+                    MessageExchange exchange = channel.accept(); 
+                    if (exchange != null) { 
+                        // REVISIT: serialized message handling not such a
+                        // good idea.
+                        // REVISIT: can there be more than one ep?
+                        ServiceEndpoint ep = exchange.getEndpoint();
+                        CXFServiceUnit csu = suManager.getServiceUnitForEndpoint(ep);
+                        ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+                        
+                        try { 
+                            Thread.currentThread().setContextClassLoader(csu.getClassLoader());
+                            if (csu != null) { 
+                                LOG.info(new org.apache.cxf.common.i18n.Message(
+                                    "DISPATCH.TO.SU", LOG).toString());
+                                dispatch(exchange);
+                            } else {
+                                LOG.info(new org.apache.cxf.common.i18n.Message(
+                                    "NO.SU.FOUND", LOG).toString());
+                            }
+                        } finally { 
+                            Thread.currentThread().setContextClassLoader(oldLoader);
+                        } 
+                    } 
+                } while(running);
+            } catch (Exception ex) {
+                LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                    "ERROR.DISPATCH.THREAD", LOG).toString(), ex);
+            } 
+            LOG.fine(new org.apache.cxf.common.i18n.Message(
+                "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT", LOG).toString());
+        }
+
+    }
+    
+    private void dispatch(MessageExchange exchange) throws IOException {
+        QName opName = exchange.getOperation(); 
+        LOG.fine("dispatch method: " + opName);
+        
+        NormalizedMessage nm = exchange.getMessage("in");
+        try {
+            final InputStream in = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
+            //get the message to be interceptor
+            MessageImpl inMessage = new MessageImpl();
+            inMessage.put(JBIConstants.MESSAGE_EXCHANGE_PROPERTY, exchange);
+            inMessage.setContent(InputStream.class, in);
+                                           
+            inMessage.setDestination(this);            
+            //handle the incoming message
+            incomingObserver.onMessage(inMessage);
+        } catch (Exception ex) {
+            LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                "ERROR.PREPARE.MESSAGE", LOG).toString(), ex);
+            throw new IOException(ex.getMessage());
+        }
+
+    }
 }

Added: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java?view=auto&rev=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
(added)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.dom.DOMSource;
+
+import org.w3c.dom.Document;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+
+public class JBIDestinationOutputStream extends AbstractCachedOutputStream {
+
+    private static final Logger LOG = LogUtils.getL7dLogger(JBIDestinationOutputStream.class);
+    private Message inMessage;
+    private DeliveryChannel channel;
+    
+    public JBIDestinationOutputStream(Message m, 
+                               DeliveryChannel dc) {
+        super();
+        inMessage = m;
+        channel = dc;
+    }
+    
+    @Override
+    protected void doFlush() throws IOException {
+        // so far do nothing
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        commitOutputMessage();
+    }
+
+    @Override
+    protected void onWrite() throws IOException {
+        // so far do nothing
+    }
+
+    private void commitOutputMessage() throws IOException {
+        try { 
+            if (inMessage.getExchange().isOneWay()) {
+                return;
+            } else {
+                
+                ByteArrayOutputStream baos = (ByteArrayOutputStream)getOut();
+                ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+                LOG.finest(new org.apache.cxf.common.i18n.Message(
+                    "BUILDING.DOCUMENT", LOG).toString());
+                DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+                DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+                Document doc = builder.parse(bais);
+            
+                MessageExchange xchng = (MessageExchange)inMessage.get(
+                    JBIConstants.MESSAGE_EXCHANGE_PROPERTY);
+                LOG.fine(new org.apache.cxf.common.i18n.Message(
+                    "CREATE.NORMALIZED.MESSAGE", LOG).toString());
+                NormalizedMessage msg = xchng.createMessage();
+                msg.setContent(new DOMSource(doc));
+                LOG.info("the message class is " + xchng.getClass().getName());
+                xchng.setMessage(msg, "out");
+                LOG.fine(new org.apache.cxf.common.i18n.Message(
+                    "POST.DISPATCH", LOG).toString());
+                channel.send(xchng);
+            }
+        } catch (Exception ex) { 
+            LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                "ERROR.SEND.MESSAGE", LOG).toString(), ex);
+        }
+    }
+    
+}

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestinationOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java?view=auto&rev=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
(added)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
Fri Oct 27 02:48:01 2006
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jbi.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import javax.xml.transform.Source;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+
+public final class JBIMessageHelper { 
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(JBIMessageHelper.class);
+    
+    private static final TransformerFactory TRANSFORMER_FACTORY 
+        = TransformerFactory.newInstance();
+
+    private JBIMessageHelper() { 
+        // complete 
+    }
+    
+    
+    public static InputStream convertMessageToInputStream(Source src) 
+        throws IOException, TransformerConfigurationException, TransformerException { 
+
+        final Transformer transformer = TRANSFORMER_FACTORY.newTransformer();
+        
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
+        StreamResult result = new StreamResult(baos);
+        transformer.transform(src, result);
+        LOG.finest(new Message("RECEIVED.MESSAGE", LOG) + new String(baos.toByteArray()));
+        
+        return new ByteArrayInputStream(baos.toByteArray());
+    } 
+}

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIMessageHelper.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java?view=diff&rev=468334&r1=468333&r2=468334
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
(original)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBITransportFactory.java
Fri Oct 27 02:48:01 2006
@@ -21,7 +21,15 @@
 package org.apache.cxf.jbi.transport;
 
 import java.io.IOException;
+import java.util.logging.Logger;
 
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.jbi.se.CXFServiceUnitManager;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractTransportFactory;
 import org.apache.cxf.transport.Conduit;
@@ -32,20 +40,65 @@
 
 public class JBITransportFactory extends AbstractTransportFactory implements ConduitInitiator,
     DestinationFactory {
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(JBITransportFactory.class);
+
+    private CXFServiceUnitManager suManager; 
+    private DeliveryChannel deliveryChannel;
+    private Bus bus;
+    
+    @Resource
+    public void setBus(Bus b) {
+        bus = b;
+    }
+    
+    public Bus getBus() {
+        return bus;
+    }
+
+    public DeliveryChannel getDeliveryChannel() {
+        return deliveryChannel;
+    }
+
+    public void setDeliveryChannel(DeliveryChannel newDeliverychannel) {
+        LOG.fine(new org.apache.cxf.common.i18n.Message(
+            "CONFIG.DELIVERY.CHANNEL", LOG).toString() + newDeliverychannel);
+        deliveryChannel = newDeliverychannel;
+    }
+
+    public CXFServiceUnitManager getServiceUnitManager() { 
+        return suManager; 
+    }
+    
+    public void setServiceUnitManager(CXFServiceUnitManager sum) {
+        if (sum == null) { 
+            Thread.dumpStack(); 
+        } 
+        LOG.fine(new org.apache.cxf.common.i18n.Message(
+            "CONFIG.SU.MANAGER", LOG).toString() + sum);
+        suManager = sum;
+    }
 
     public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        return getConduit(targetInfo, null);
     }
 
-    public Conduit getConduit(EndpointInfo localInfo, EndpointReferenceType target) throws
IOException {
-        // TODO Auto-generated method stub
-        return null;
+    public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws
IOException {
+        Conduit conduit = new JBIConduit(target, deliveryChannel);
+        Configurer configurer = bus.getExtension(Configurer.class);
+        if (null != configurer) {
+            configurer.configureBean(conduit);
+        }
+        return conduit;
     }
 
     public Destination getDestination(EndpointInfo ei) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+        JBIDestination destination = new JBIDestination(this, ei, deliveryChannel, suManager);
+        Configurer configurer = bus.getExtension(Configurer.class);
+        if (null != configurer) {
+            configurer.configureBean(destination);
+        }
+        return destination;
     }
 
 }



Mime
View raw message