activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r522906 - in /activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport: ./ CamelConduit.java CamelConstants.java CamelDestination.java CamelTransport.java CamelTransportBase.java CamelTransportFactory.java
Date Tue, 27 Mar 2007 13:41:10 GMT
Author: jstrachan
Date: Tue Mar 27 06:41:09 2007
New Revision: 522906

URL: http://svn.apache.org/viewvc?view=rev&rev=522906
Log:
a rough initial spike attempting to port the JMS transport for CXF to Camel

Added:
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
  (with props)
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
  (with props)
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
  (with props)
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
  (with props)
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
  (with props)
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
  (with props)

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?view=auto&rev=522906
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
(added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,283 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurable;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @version $Revision$
+ */
+public class CamelConduit extends AbstractConduit implements Configurable {
+    protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
+    private static final Logger LOG = LogUtils.getL7dLogger(CamelConduit.class);
+    protected final CamelTransportBase base;
+    private String targetCamelEndpointUri;
+/*
+    protected ClientConfig clientConfig;
+    protected ClientBehaviorPolicyType runtimePolicy;
+    protected AddressType address;
+    protected SessionPoolType sessionPool;
+*/
+
+    public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo, EndpointReferenceType
targetReference) {
+        super(targetReference);
+
+        base = new CamelTransportBase(camelContext, bus, endpointInfo, false, BASE_BEAN_NAME_SUFFIX);
+
+        initConfig();
+    }
+
+    // prepare the message for send out , not actually send out the message
+    public void send(Message message) throws IOException {
+        getLogger().log(Level.FINE, "JMSConduit send message");
+
+        message.setContent(OutputStream.class,
+                new JMSOutputStream(message));
+    }
+
+    public void close() {
+        getLogger().log(Level.FINE, "JMSConduit closed ");
+
+        // ensure resources held by session factory are released
+        //
+        base.close();
+    }
+
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    public String getBeanName() {
+        return base.endpointInfo.getName().toString() + ".jms-conduit";
+    }
+
+    private void initConfig() {
+
+/*
+        this.address = base.endpointInfo.getTraversedExtensor(new AddressType(),
+                                                              AddressType.class);
+        this.sessionPool = base.endpointInfo.getTraversedExtensor(new SessionPoolType(),
+                                                                  SessionPoolType.class);
+        this.clientConfig = base.endpointInfo.getTraversedExtensor(new ClientConfig(),
+                                                                   ClientConfig.class);
+        this.runtimePolicy = base.endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
+                                                                    ClientBehaviorPolicyType.class);
+*/
+
+        Configurer configurer = base.bus.getExtension(Configurer.class);
+        if (null != configurer) {
+            configurer.configureBean(this);
+        }
+    }
+
+    private class JMSOutputStream extends AbstractCachedOutputStream {
+        private Message outMessage;
+        private boolean isOneWay;
+
+        public JMSOutputStream(Message m) {
+            outMessage = m;
+        }
+
+        protected void doFlush() throws IOException {
+            //do nothing here
+        }
+
+        protected void doClose() throws IOException {
+            isOneWay = outMessage.getExchange().isOneWay();
+            commitOutputMessage();
+            if (!isOneWay) {
+                handleResponse();
+            }
+        }
+
+        protected void onWrite() throws IOException {
+
+        }
+
+        private void commitOutputMessage() {
+            base.client.send(targetCamelEndpointUri, new Processor<org.apache.camel.Exchange>()
{
+                public void onExchange(org.apache.camel.Exchange reply) {
+                    Object request = null;
+
+                    if (isTextPayload()) {
+                        request = currentStream.toString();
+                    }
+                    else {
+                        request = ((ByteArrayOutputStream) currentStream).toByteArray();
+                    }
+
+                    getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
+                    String replyTo = base.getReplyDestination();
+
+                    //TODO setting up the responseExpected
+
+                    base.marshal(request, replyTo, reply);
+
+                    base.setMessageProperties(outMessage, reply);
+
+                    String correlationID = null;
+                    if (!isOneWay) {
+                        // TODO create a correlationID
+                        String id = null;
+
+                        if (id != null) {
+                            if (correlationID != null) {
+                                String error = "User cannot set JMSCorrelationID when "
+                                        + "making a request/reply invocation using "
+                                        + "a static replyTo Queue.";
+                            }
+                            correlationID = id;
+                        }
+                    }
+
+                    if (correlationID != null) {
+                        reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID);
+                    }
+                    else {
+                        //No message correlation id is set. Whatever comeback will be accepted
as responses.
+                        // We assume that it will only happen in case of the temp. reply
queue.
+                    }
+
+                    getLogger().log(Level.FINE, "client sending request: ", reply.getIn());
+                }
+            });
+        }
+
+        private void handleResponse() throws IOException {
+            // REVISIT distinguish decoupled case or oneway call
+            Object response = null;
+
+            //TODO if outMessage need to get the response
+            Message inMessage = new MessageImpl();
+            outMessage.getExchange().setInMessage(inMessage);
+            //set the message header back to the incomeMessage
+            //inMessage.put(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
+            //              outMessage.get(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
+
+            /*
+            Object result1;
+
+            Object result = null;
+
+            javax.jms.Message jmsMessage1 = pooledSession.consumer().receive(timeout);
+            getLogger().log(Level.FINE, "client received reply: " , jmsMessage1);
+
+            if (jmsMessage1 != null) {
+
+                base.populateIncomingContext(jmsMessage1, outMessage, JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS);
+                String messageType = jmsMessage1 instanceof TextMessage
+                            ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE;
+                result = base.unmarshal((org.apache.camel.Exchange) outMessage);
+                result1 = result;
+            } else {
+                String error = "JMSClientTransport.receive() timed out. No message available.";
+                getLogger().log(Level.SEVERE, error);
+                //TODO: Review what exception should we throw.
+                throw new JMSException(error);
+
+            }
+            response = result1;
+
+            //set the message header back to the incomeMessage
+            inMessage.put(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
+                          outMessage.get(JMSConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
+
+            */
+
+            getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
+
+            // setup the inMessage response stream
+            byte[] bytes = null;
+            if (response instanceof String) {
+                String requestString = (String) response;
+                bytes = requestString.getBytes();
+            }
+            else {
+                bytes = (byte[]) response;
+            }
+            inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
+            getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
+            incomingObserver.onMessage(inMessage);
+        }
+    }
+
+    private boolean isTextPayload() {
+        // TODO use runtime policy
+        return true;
+    }
+
+    /**
+     * Represented decoupled response endpoint.
+     */
+    protected class DecoupledDestination implements Destination {
+        protected MessageObserver decoupledMessageObserver;
+        private EndpointReferenceType address;
+
+        DecoupledDestination(EndpointReferenceType ref,
+                MessageObserver incomingObserver) {
+            address = ref;
+            decoupledMessageObserver = incomingObserver;
+        }
+
+        public EndpointReferenceType getAddress() {
+            return address;
+        }
+
+        public Conduit getBackChannel(Message inMessage,
+                Message partialResponse,
+                EndpointReferenceType addr)
+                throws IOException {
+            // shouldn't be called on decoupled endpoint
+            return null;
+        }
+
+        public void shutdown() {
+            // TODO Auto-generated method stub
+        }
+
+        public synchronized void setMessageObserver(MessageObserver observer) {
+            decoupledMessageObserver = observer;
+        }
+
+        protected synchronized MessageObserver getMessageObserver() {
+            return decoupledMessageObserver;
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java?view=auto&rev=522906
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
(added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+/**
+ * @version $Revision$
+ */
+public class CamelConstants {
+
+    public static final String TEXT_MESSAGE_TYPE = "text";
+    public static final String BINARY_MESSAGE_TYPE = "binary";
+
+    public static final String CAMEL_SERVER_REQUEST_HEADERS = "org.apache.cxf.camel.server.request.headers";
+    public static final String CAMEL_SERVER_RESPONSE_HEADERS = "org.apache.cxf.camel.server.response.headers";
+    public static final String CAMEL_REQUEST_MESSAGE = "org.apache.cxf.camel.request.message";
+    public static final String CAMEL_RESPONSE_MESSAGE = "org.apache.cxf.camel.reponse.message";
+    public static final String CAMEL_CLIENT_REQUEST_HEADERS = "org.apache.cxf.camel.client.request.headers";
+    public static final String CAMEL_CLIENT_RESPONSE_HEADERS =
+        "org.apache.cxf.camel.client.response.headers";
+
+    public static final String CAMEL_CLIENT_RECEIVE_TIMEOUT = "org.apache.cxf.camel.client.timeout";
+
+    public static final String CAMEL_SERVER_CONFIGURATION_URI =
+        "http://cxf.apache.org/configuration/transport/camel-server";
+    public static final String CAMEL_CLIENT_CONFIGURATION_URI =
+        "http://cxf.apache.org/configuration/transport/camel-client";
+    public static final String ENDPOINT_CONFIGURATION_URI =
+        "http://cxf.apache.org/jaxws/endpoint-config";
+    public static final String SERVICE_CONFIGURATION_URI =
+        "http://cxf.apache.org/jaxws/service-config";
+    public static final String PORT_CONFIGURATION_URI =
+        "http://cxf.apache.org/jaxws/port-config";
+
+    public static final String CAMEL_CLIENT_CONFIG_ID = "camel-client";
+    public static final String CAMEL_SERVER_CONFIG_ID = "camel-server";
+
+    public static final String CAMEL_REBASED_REPLY_TO = "org.apache.cxf.camel.server.replyto";
+
+    public static final String CAMEL_CORRELATION_ID = "org.apache.cxf.camel.correlationId";
+}

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConstants.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java?view=auto&rev=522906
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
(added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,247 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Endpoint;
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurable;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.AbstractDestination;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @version $Revision$
+ */
+public class CamelDestination extends AbstractDestination implements Configurable {
+    protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
+    private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class);
+    CamelContext camelContext;
+    String camelUri;
+    final ConduitInitiator conduitInitiator;
+    private CamelTransportBase base;
+    private Endpoint endpoint;
+
+    public CamelDestination(CamelContext camelContext, Bus bus,
+            ConduitInitiator ci,
+            EndpointInfo info) throws IOException {
+        super(getTargetReference(info, bus), info);
+        this.camelContext = camelContext;
+
+        base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
+
+        conduitInitiator = ci;
+
+        initConfig();
+    }
+
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    /**
+     * @param inMessage the incoming message
+     * @return the inbuilt backchannel
+     */
+    protected Conduit getInbuiltBackChannel(Message inMessage) {
+        return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
+                inMessage);
+    }
+
+    public void activate() {
+        getLogger().log(Level.INFO, "CamelDestination activate().... ");
+
+        try {
+            getLogger().log(Level.FINE, "establishing Camel connection");
+            endpoint = camelContext.resolveEndpoint(camelUri);
+        }
+        catch (Exception ex) {
+            getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex);
+        }
+    }
+
+    public void deactivate() {
+            base.close();
+    }
+
+    public void shutdown() {
+        getLogger().log(Level.FINE, "CamelDestination shutdown()");
+        this.deactivate();
+    }
+
+    protected void incoming(Exchange exchange) {
+        getLogger().log(Level.FINE, "server received request: ", exchange);
+
+        byte[] bytes = base.unmarshal(exchange);
+
+        // get the message to be interceptor
+        MessageImpl inMessage = new MessageImpl();
+        inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
+        base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
+        //inMessage.put(JMSConstants.CAMEL_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
+        inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
+
+        inMessage.setDestination(this);
+
+        //handle the incoming message
+        incomingObserver.onMessage(inMessage);
+    }
+
+    public String getBeanName() {
+        return endpointInfo.getName().toString() + ".jms-destination";
+    }
+
+    private void initConfig() {
+/*
+        this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
+                                                               ServerBehaviorPolicyType.class);
+        this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
+        this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
+        this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
+*/
+    }
+
+    protected class ConsumerProcessor implements Processor<Exchange> {
+        public void onExchange(Exchange exchange) {
+            try {
+                incoming(exchange);
+            }
+            catch (Throwable ex) {
+                getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
+            }
+        }
+    }
+
+    // this should deal with the cxf message
+    protected class BackChannelConduit extends AbstractConduit {
+        protected Message inMessage;
+
+        BackChannelConduit(EndpointReferenceType ref, Message message) {
+            super(ref);
+            inMessage = message;
+        }
+
+        /**
+         * Register a message observer for incoming messages.
+         *
+         * @param observer the observer to notify on receipt of incoming
+         */
+        public void setMessageObserver(MessageObserver observer) {
+            // shouldn't be called for a back channel conduit
+        }
+
+        /**
+         * Send an outbound message, assumed to contain all the name-value
+         * mappings of the corresponding input message (if any).
+         *
+         * @param message the message to be sent.
+         */
+        public void send(Message message) throws IOException {
+            // setup the message to be send back
+            message.put(CamelConstants.CAMEL_REQUEST_MESSAGE,
+                    inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
+            message.setContent(OutputStream.class,
+                    new CamelOutputStream(inMessage));
+        }
+
+        protected Logger getLogger() {
+            return LOG;
+        }
+    }
+
+    private class CamelOutputStream extends AbstractCachedOutputStream {
+        private Message inMessage;
+        private Producer<Exchange> replyTo;
+        private Producer<Exchange> sender;
+
+        // setup the ByteArrayStream
+        public CamelOutputStream(Message m) {
+            super();
+            inMessage = m;
+        }
+
+        // prepair the message and get the send out message
+        private void commitOutputMessage() throws IOException {
+
+            //setup the reply message
+            final String replyToUri = getReplyToDestination(inMessage);
+
+            base.client.send(replyToUri, new Processor<Exchange>() {
+                public void onExchange(Exchange reply) {
+                    base.marshal(currentStream.toString(), replyToUri, reply);
+
+                    setReplyCorrelationID(inMessage, reply);
+
+                    base.setMessageProperties(inMessage, reply);
+
+                    getLogger().log(Level.FINE, "just server sending reply: ", reply);
+                }
+            });
+        }
+
+        @Override
+        protected void doFlush() throws IOException {
+            // Do nothing here
+        }
+
+        @Override
+        protected void doClose() throws IOException {
+            commitOutputMessage();
+        }
+
+        @Override
+        protected void onWrite() throws IOException {
+            // Do nothing here
+        }
+    }
+
+    protected String getReplyToDestination(Message inMessage) {
+        if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
+            return (String) inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
+        }
+        else {
+            return base.getReplyDestination();
+        }
+    }
+
+    protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
+        Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
+        if (value != null) {
+            reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value);
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelDestination.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java?view=auto&rev=522906
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
(added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+/**
+ * @version $Revision$
+ */
+public class CamelTransport {
+}

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransport.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java?view=auto&rev=522906
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
(added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.CamelClient;
+import org.apache.cxf.Bus;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+
+/**
+ * @version $Revision$
+ */
+public class CamelTransportBase {
+    private String replyDestination;
+    CamelClient<Exchange> client;
+    private final CamelContext camelContext;
+    Bus bus;
+    EndpointInfo endpointInfo;
+
+    public CamelTransportBase(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo,
boolean b, String baseBeanNameSuffix) {
+        this.camelContext = camelContext;
+        this.bus = bus;
+        this.endpointInfo = endpointInfo;
+        this.client = new CamelClient<Exchange>(camelContext);
+    }
+
+    public void populateIncomingContext(Exchange exchange, MessageImpl inMessage, String
jmsServerRequestHeaders) {
+
+    }
+
+
+    public String getReplyDestination() {
+        return replyDestination;
+    }
+
+    public void setMessageProperties(Message inMessage, Exchange reply) {
+
+    }
+
+    public void close() {
+        if (client != null) {
+            try {
+                client.stop();
+            }
+            catch (Exception e) {
+                // do nothing?
+                // TODO
+            }
+        }
+    }
+
+    /**
+     * Populates a Camel exchange with a payload
+     *
+     * @param payload the message payload, expected to be either of type
+     * String or byte[] depending on payload type
+     * @param replyTo the ReplyTo destination if any
+     * @param exchange the underlying exchange to marshal to
+     */
+    protected void marshal(Object payload, String replyTo,   Exchange exchange) {
+        org.apache.camel.Message message = exchange.getIn();
+        message.setBody(payload);
+        if (replyTo != null) {
+            message.setHeader(CamelConstants.CAMEL_CORRELATION_ID, replyTo);
+        }
+
+    }
+
+    /**
+     * Unmarshal the payload of an incoming message.
+     */
+    public byte[] unmarshal(Exchange exchange) {
+        return exchange.getIn().getBody(byte[].class);
+    }
+
+    /*
+    protected JMSMessageHeadersType populateIncomingContext(javax.jms.Message message,
+                                                            org.apache.cxf.message.Message
inMessage,
+                                                     String headerType)  throws JMSException
{
+        JMSMessageHeadersType headers = null;
+
+        headers = (JMSMessageHeadersType)inMessage.get(headerType);
+
+        if (headers == null) {
+            headers = new JMSMessageHeadersType();
+            inMessage.put(headerType, headers);
+        }
+
+        headers.setJMSCorrelationID(message.getJMSCorrelationID());
+        headers.setJMSDeliveryMode(new Integer(message.getJMSDeliveryMode()));
+        headers.setJMSExpiration(new Long(message.getJMSExpiration()));
+        headers.setJMSMessageID(message.getJMSMessageID());
+        headers.setJMSPriority(new Integer(message.getJMSPriority()));
+        headers.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
+        headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
+        headers.setJMSType(message.getJMSType());
+
+        List<JMSPropertyType> props = headers.getProperty();
+        Enumeration enm = message.getPropertyNames();
+        while (enm.hasMoreElements()) {
+            String name = (String)enm.nextElement();
+            String val = message.getStringProperty(name);
+            JMSPropertyType prop = new JMSPropertyType();
+            prop.setName(name);
+            prop.setValue(val);
+            props.add(prop);
+        }
+
+        return headers;
+    }
+
+    protected int getJMSDeliveryMode(JMSMessageHeadersType headers) {
+        int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+
+        if (headers != null && headers.isSetJMSDeliveryMode()) {
+            deliveryMode = headers.getJMSDeliveryMode();
+        }
+        return deliveryMode;
+    }
+
+    protected int getJMSPriority(JMSMessageHeadersType headers) {
+        int priority = Message.DEFAULT_PRIORITY;
+        if (headers != null && headers.isSetJMSPriority()) {
+            priority = headers.getJMSPriority();
+        }
+        return priority;
+    }
+
+    protected long getTimeToLive(JMSMessageHeadersType headers) {
+        long ttl = -1;
+        if (headers != null && headers.isSetTimeToLive()) {
+            ttl = headers.getTimeToLive();
+        }
+        return ttl;
+    }
+
+    protected String getCorrelationId(JMSMessageHeadersType headers) {
+        String correlationId  = null;
+        if (headers != null
+            && headers.isSetJMSCorrelationID()) {
+            correlationId = headers.getJMSCorrelationID();
+        }
+        return correlationId;
+    }
+
+
+    protected String getAddrUriFromJMSAddrPolicy() {
+        AddressType jmsAddressPolicy = transport.getJMSAddress();
+        return "jms:" + jmsAddressPolicy.getJndiConnectionFactoryName()
+                        + "#"
+                        + jmsAddressPolicy.getJndiDestinationName();
+    }
+
+    protected String getReplyTotAddrUriFromJMSAddrPolicy() {
+        AddressType jmsAddressPolicy = transport.getJMSAddress();
+        return "jms:"
+                        + jmsAddressPolicy.getJndiConnectionFactoryName()
+                        + "#"
+                        + jmsAddressPolicy.getJndiReplyDestinationName();
+    }
+
+    protected boolean isDestinationStyleQueue() {
+        return JMSConstants.CAMEL_QUEUE.equals(
+            transport.getJMSAddress().getDestinationStyle().value());
+    }
+    */
+}

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportBase.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java?view=auto&rev=522906
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
(added)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
Tue Mar 27 06:41:09 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.transport;
+
+import org.apache.camel.CamelContext;
+import org.apache.cxf.Bus;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractTransportFactory;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @version $Revision$
+ */
+public class CamelTransportFactory extends AbstractTransportFactory implements ConduitInitiator,
DestinationFactory {
+    private static final Set<String> URI_PREFIXES = new HashSet<String>();
+
+    static {
+        URI_PREFIXES.add("camel://");
+    }
+
+    private Bus bus;
+    private CamelContext camelContext;
+
+    @Resource
+    public void setBus(Bus b) {
+        bus = b;
+    }
+
+    public Bus getBus() {
+        return bus;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Resource
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public Conduit getConduit(EndpointInfo targetInfo) throws IOException {
+        return getConduit(targetInfo, null);
+    }
+
+    public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws
IOException {
+        return new CamelConduit(camelContext, bus, endpointInfo, target);
+    }
+
+    public Destination getDestination(EndpointInfo endpointInfo) throws IOException {
+        CamelDestination destination = new CamelDestination(camelContext, bus, this, endpointInfo);
+        Configurer configurer = bus.getExtension(Configurer.class);
+        if (null != configurer) {
+            configurer.configureBean(destination);
+        }
+        return destination;
+    }
+
+    public Set<String> getUriPrefixes() {
+        return URI_PREFIXES;
+    }
+}
+

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelTransportFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message