cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ff...@apache.org
Subject svn commit: r552811 - in /incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi: JBIDestination.java JBIDispatcherUtil.java JBITransportFactory.java
Date Tue, 03 Jul 2007 12:38:16 GMT
Author: ffang
Date: Tue Jul  3 05:38:15 2007
New Revision: 552811

URL: http://svn.apache.org/viewvc?view=rev&rev=552811
Log:
support multiple endpoints in CXF ServiceEngine

Added:
    incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java
  (with props)
Modified:
    incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDestination.java
    incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBITransportFactory.java

Modified: incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDestination.java?view=diff&rev=552811&r1=552810&r2=552811
==============================================================================
--- incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDestination.java
(original)
+++ incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDestination.java
Tue Jul  3 05:38:15 2007
@@ -20,24 +20,16 @@
 package org.apache.cxf.transport.jbi;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.xml.namespace.QName;
 
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
-
-
-
 import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.AbstractDestination;
 import org.apache.cxf.transport.Conduit;
@@ -48,14 +40,13 @@
 public class JBIDestination extends AbstractDestination {
     
     private static final Logger LOG = LogUtils.getL7dLogger(JBIDestination.class);
-    
-    private final DeliveryChannel channel;
-    private JBIDispatcher dispatcher;
-    private volatile boolean running;
-    
+    private JBIDispatcherUtil dispatcherUtil;
+    private DeliveryChannel channel;
     public JBIDestination(EndpointInfo info,
+                          JBIDispatcherUtil dispatcher,
                           DeliveryChannel dc) {
         super(getTargetReference(info, null), info);
+        this.dispatcherUtil = dispatcher;
         this.channel = dc;
     }
 
@@ -74,18 +65,15 @@
     }
     
     public void shutdown() {
-        running = false;
+        dispatcherUtil.deactivateDispatch();
     }
 
     public void deactivate() {
-        running = false;
+        dispatcherUtil.deactivateDispatch();
     }
 
     public void activate()  {
-        getLogger().info(new org.apache.cxf.common.i18n.Message(
-            "ACTIVE.JBI.SERVER.TRANSPORT", getLogger()).toString());
-        dispatcher = new JBIDispatcher();
-        new Thread(dispatcher).start();
+        dispatcherUtil.activateDispatch();
     }
 
     
@@ -131,69 +119,4 @@
         }
     }
     
-    
-    private class JBIDispatcher implements Runnable {
-
-        public final void run() {
-
-            try {
-                running = true;
-                getLogger().info(new org.apache.cxf.common.i18n.Message(
-                    "RECEIVE.THREAD.START", getLogger()).toString());
-                do {
-                    MessageExchange exchange = null;
-                    synchronized (channel) {
-                        try {
-                            exchange = channel.accept();
-                        } catch (Exception e) {
-                            // ignore
-                        }
-                    }
-
-                    if (exchange != null) {
-                        try {
-                            getLogger().info(new org.apache.cxf.common.i18n.Message(
-                                    "DISPATCH.TO.SU", getLogger()).toString());
-                            dispatch(exchange);
-                            
-                        } finally {
-                            //
-                        }
-                    }
-                } while(running);
-            } catch (Exception ex) {
-                getLogger().log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
-                    "ERROR.DISPATCH.THREAD", getLogger()).toString(), ex);
-            }
-            getLogger().fine(new org.apache.cxf.common.i18n.Message(
-                                 "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT", getLogger()).toString());
-        }
-    }
-    
-            
-
-    public void dispatch(MessageExchange exchange) throws IOException {
-        QName opName = exchange.getOperation(); 
-        getLogger().info("dispatch method: " + opName);
-                
-        NormalizedMessage nm = exchange.getMessage("in");
-        try {
-
-            MessageImpl inMessage = new MessageImpl();
-            inMessage.put(MessageExchange.class, exchange);
-            
-            
-            final InputStream in = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
-            inMessage.setContent(InputStream.class, in);
-                                           
-            inMessage.setDestination(this);
-            getMessageObserver().onMessage(inMessage);
-            
-        } catch (Exception ex) {
-            getLogger().log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
-                "ERROR.PREPARE.MESSAGE", getLogger()).toString(), ex);
-            throw new IOException(ex.getMessage());
-        }
-
-    }
 }

Added: incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java?view=auto&rev=552811
==============================================================================
--- incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java
(added)
+++ incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java
Tue Jul  3 05:38:15 2007
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jbi;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.ConduitInitiator;
+
+public final class JBIDispatcherUtil {
+    private static final Logger LOG = LogUtils.getL7dLogger(JBIDispatcherUtil.class);
+    private static JBIDispatcherUtil dispatchUtil;
+    private final DeliveryChannel channel;
+    private ConduitInitiator conduitInitiator;
+    private int activeEndpoints;
+    private boolean running;
+    
+    private JBIDispatcherUtil(ConduitInitiator ci,
+                              DeliveryChannel dc) {
+        this.conduitInitiator = ci;
+        this.channel = dc;
+    }
+    
+    public static synchronized JBIDispatcherUtil getInstance(ConduitInitiator ci,
+                                                             DeliveryChannel dc) {
+        if (dispatchUtil == null) {
+            dispatchUtil = new JBIDispatcherUtil(ci, dc);
+        }
+        return dispatchUtil;
+        
+    }
+    
+   
+    
+    public void activateDispatch() {
+        activeEndpoints++;
+        if (!running) {
+            new Thread(new JBIDispatcher()).start();
+        }
+    }
+    
+    public void startDispatch() {
+        
+    }
+    
+    public void deactivateDispatch() {
+        activeEndpoints--;
+    }
+    
+    protected Logger getLogger() {
+        return LOG;
+    }
+    
+    private class JBIDispatcher implements Runnable {
+
+        public final void run() {
+            
+            try {
+                synchronized (channel) {
+                    running = true;
+                }
+                getLogger().info(new org.apache.cxf.common.i18n.Message(
+                    "RECEIVE.THREAD.START", getLogger()).toString());
+                do {
+                    MessageExchange exchange = null;
+                    synchronized (channel) {
+                        try {
+                            exchange = channel.accept();
+                        } catch (Exception e) {
+                            // ignore
+                        }
+                    }
+
+                    if (exchange != null) {
+                        try {
+                            getLogger().info(new org.apache.cxf.common.i18n.Message(
+                                    "DISPATCH.TO.SU", getLogger()).toString());
+                            dispatch(exchange);
+                            
+                        } finally {
+                            //
+                        }
+                    }
+                } while(activeEndpoints > 0);
+                synchronized (channel) {
+                    running = false;
+                }
+            } catch (Exception ex) {
+                getLogger().log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                    "ERROR.DISPATCH.THREAD", getLogger()).toString(), ex);
+            }
+            getLogger().fine(new org.apache.cxf.common.i18n.Message(
+                                 "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT", getLogger()).toString());
+        }
+    }
+    
+    public void dispatch(MessageExchange exchange) throws IOException {
+        QName opName = exchange.getOperation(); 
+        getLogger().info("dispatch method: " + opName);
+                
+        NormalizedMessage nm = exchange.getMessage("in");
+        try {
+
+            MessageImpl inMessage = new MessageImpl();
+            inMessage.put(MessageExchange.class, exchange);
+            
+            
+            final InputStream in = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
+            inMessage.setContent(InputStream.class, in);
+                                           
+            //dispatch to correct destination in case of multiple endpoint
+            inMessage.setDestination(((JBITransportFactory)conduitInitiator).
+                                     getDestination(exchange.getService().toString()
+                                     + exchange.getInterfaceName().toString()));
+            ((JBITransportFactory)conduitInitiator).
+            getDestination(exchange.getService().toString()
+                           + exchange.getInterfaceName().toString()).
+                getMessageObserver().onMessage(inMessage);
+            
+        } catch (Exception ex) {
+            getLogger().log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                "ERROR.PREPARE.MESSAGE", getLogger()).toString(), ex);
+            throw new IOException(ex.getMessage());
+        }
+
+    }
+}

Propchange: incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBIDispatcherUtil.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBITransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBITransportFactory.java?view=diff&rev=552811&r1=552810&r2=552811
==============================================================================
--- incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBITransportFactory.java
(original)
+++ incubator/cxf/trunk/rt/transports/jbi/src/main/java/org/apache/cxf/transport/jbi/JBITransportFactory.java
Tue Jul  3 05:38:15 2007
@@ -124,13 +124,16 @@
     }
 
     public Destination getDestination(EndpointInfo ei) throws IOException {
-        JBIDestination destination = new JBIDestination(ei, getDeliveryChannel());
+        JBIDestination destination = new JBIDestination(ei, 
+                                         JBIDispatcherUtil.getInstance(this, getDeliveryChannel()),

+                                         getDeliveryChannel());
         Configurer configurer = bus.getExtension(Configurer.class);
         if (null != configurer) {
             configurer.configureBean(destination);
         }
         try {
-            putDestination(ei.getAddress(), destination);
+            putDestination(ei.getService().getName().toString()
+                + ei.getInterface().getName().toString(), destination);
         } catch (JBIException e) {
             throw new IOException(e.getMessage());
         }



Mime
View raw message