cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andreasm...@apache.org
Subject svn commit: r470005 [1/3] - in /incubator/cxf/trunk/rt/ws/rm: ./ src/main/java/org/apache/cxf/ws/rm/ src/main/java/org/apache/cxf/ws/rm/impl/ src/main/java/org/apache/cxf/ws/rm/persistence/ src/main/java/org/apache/cxf/ws/rm/soap/ src/main/resources/ME...
Date Wed, 01 Nov 2006 17:36:09 GMT
Author: andreasmyth
Date: Wed Nov  1 09:36:06 2006
New Revision: 470005

URL: http://svn.apache.org/viewvc?view=rev&rev=470005
Log:
[JIRA CXF-138] Split RMInterceptor in separate inbound and outbound interceptors, managing state in RMManager.

Added:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
      - copied unchanged from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Messages.properties
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFaultFactory.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceMonitor.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceMonitor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Servant.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Source.java
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java
    incubator/cxf/trunk/rt/ws/rm/src/main/resources/META-INF/
    incubator/cxf/trunk/rt/ws/rm/src/main/resources/META-INF/cxf/
    incubator/cxf/trunk/rt/ws/rm/src/main/resources/META-INF/cxf/cxf-extension.xml   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager.xsd
      - copied, changed from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-interceptor.xsd
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractSequenceTest.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/SourceSequenceTest.java   (with props)
Removed:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Messages.properties
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceMonitor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceService.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Servant.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-interceptor.xsd
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImplTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImplTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/ProxyTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/RMInterceptorTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/SourceSequenceImplTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/TestUtils.java
Modified:
    incubator/cxf/trunk/rt/ws/rm/pom.xml
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptorTest.java

Modified: incubator/cxf/trunk/rt/ws/rm/pom.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/pom.xml?view=diff&rev=470005&r1=470004&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/pom.xml (original)
+++ incubator/cxf/trunk/rt/ws/rm/pom.xml Wed Nov  1 09:36:06 2006
@@ -129,7 +129,7 @@
                                 </deleteDirs>
                             </xsdOption>
                             <xsdOption>
-                                <xsd>${basedir}/src/main/resources/schemas/configuration/wsrm-interceptor.xsd</xsd>
+                                <xsd>${basedir}/src/main/resources/schemas/configuration/wsrm-manager.xsd</xsd>
                                 <bindingFile>${basedir}/src/main/resources/schemas/configuration/wsrm-policy.xjb</bindingFile>
                                 <extension>true</extension>
                                 <extensionArgs>

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java (from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java?view=diff&rev=470005&p1=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java&r1=469824&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java Wed Nov  1 09:36:06 2006
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.cxf.ws.rm.impl;
+package org.apache.cxf.ws.rm;
 
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.ws.rm.Identifier;
 
 public class AbstractEndpoint {
     
@@ -42,33 +41,19 @@
     public RMEndpoint getReliableEndpoint() {
         return reliableEndpoint;
     }
-
-    /**
-     * @return Returns the interceptor.
-     */
-    public RMInterceptor getInterceptor() {
-        return reliableEndpoint.getInterceptor();
-    }
     
     /**
      * @return Returns the endpoint.
      */
     public Endpoint getEndpoint() {
-        return reliableEndpoint.getEndpoint();
-    }
-    
-    /**
-     * @return Returns the proxy.
-     */
-    public Proxy getProxy() {
-        return reliableEndpoint.getProxy();
+        return reliableEndpoint.getApplicationEndpoint();
     }
     
     /**
-     * @return Returns the servant.
+     * @return Returns the manager.
      */
-    public Servant getServant() {
-        return reliableEndpoint.getServant();
+    public RMManager getManager() {
+        return reliableEndpoint.getManager();
     }
 
     /**
@@ -77,7 +62,7 @@
      * @return the sequence identifier.
      */
     public Identifier generateSequenceIdentifier() {
-        String sequenceID = ContextUtils.generateUUID();
+        String sequenceID = RMContextUtils.generateUUID();
         Identifier sid = RMUtils.getWSRMFactory().createIdentifier();
         sid.setValue(sequenceID);        
         return sid;

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?view=auto&rev=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Wed Nov  1 09:36:06 2006
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptor;
+
+/**
+ * Interceptor responsible for implementing exchange of RM protocol messages,
+ * aggregating RM metadata in the application message and processing of 
+ * RM metadata contained in incoming application messages.
+ * The same interceptor can be used on multiple endpoints.
+ *
+ */
+public abstract class AbstractRMInterceptor implements PhaseInterceptor<Message> {
+
+    private static final Logger LOG = LogUtils.getL7dLogger(AbstractRMInterceptor.class);      
+    private RMManager manager;
+    private Bus bus;
+     
+    public RMManager getManager() {
+        if (null == manager) {
+            return bus.getExtension(RMManager.class);
+        }
+        return manager;
+    }
+
+    public void setManager(RMManager m) {
+        manager = m;
+    }
+    
+    public Bus getBus() {
+        return bus;
+    }
+
+    public void setBus(Bus bus) {
+        this.bus = bus;
+    }
+    
+    // PhaseInterceptor interface
+    
+   
+
+    public String getPhase() {
+        return Phase.PRE_LOGICAL;
+    }
+
+    // Interceptor interface 
+    
+    public void handleMessage(Message msg) throws Fault {
+        
+        try {
+            handleMessage(msg, false);
+        } catch (SequenceFault ex) {
+            LOG.log(Level.SEVERE, "SequenceFault", ex);
+            throw new Fault(ex);
+        }
+    }
+    
+    public void handleFault(Message msg) {
+        try {
+            handleMessage(msg, true);
+        } catch (SequenceFault ex) {
+            LOG.log(Level.SEVERE, "SequenceFault", ex);
+        }
+    } 
+    
+    // rm logic
+    
+    abstract void handleMessage(Message msg, boolean isFault) throws SequenceFault;
+    
+    protected boolean isAplicationMessage(String action) {
+        if (RMConstants.getCreateSequenceAction().equals(action)
+            || RMConstants.getCreateSequenceResponseAction().equals(action)
+            || RMConstants.getTerminateSequenceAction().equals(action)
+            || RMConstants.getLastMessageAction().equals(action)
+            || RMConstants.getSequenceAcknowledgmentAction().equals(action)
+            || RMConstants.getSequenceInfoAction().equals(action)) {
+            return false;
+        }
+        return true;
+    }
+    
+    
+    
+    
+    
+   
+
+
+    /*
+     * private static final Logger LOG = LogUtils.getL7dLogger(RMHandler.class);
+     * private static Map<BindingBase, RMHandler> handlers; private RMSource
+     * source; private RMDestination destination; private RMProxy proxy; private
+     * RMServant servant; private ConfigurationHelper configurationHelper;
+     * private PersistenceManager persistenceManager; private Timer timer;
+     * private boolean busLifeCycleListenesRegistered; @Resource(name =
+     * JAXWSConstants.BUS_PROPERTY) private Bus bus; @Resource(name =
+     * JAXWSConstants.CLIENT_BINDING_PROPERTY) private ClientBinding
+     * clientBinding; @Resource(name = JAXWSConstants.SERVER_BINDING_PROPERTY)
+     * private ServerBinding serverBinding; @Resource(name =
+     * JAXWSConstants.CLIENT_TRANSPORT_PROPERTY) private ClientTransport
+     * clientTransport; @Resource(name =
+     * JAXWSConstants.SERVER_TRANSPORT_PROPERTY) private ServerTransport
+     * serverTransport; public RMHandler() { proxy = new RMProxy(this); servant =
+     * new RMServant(); } @PostConstruct protected synchronized void
+     * initialise() { /* getHandlerMap().put(getBinding(), this); if (null ==
+     * configurationHelper) { configurationHelper = new
+     * ConfigurationHelper(getBinding(), null == clientBinding); } if (null ==
+     * getSource()) { source = new RMSource(this); } if (null == destination) {
+     * destination = new RMDestination(this); } if (null == timer) { timer = new
+     * Timer(); } if (!busLifeCycleListenerRegistered) {
+     * getBinding().getBus().getLifeCycleManager()
+     * .registerLifeCycleListener(new RMBusLifeCycleListener(getSource()));
+     * busLifeCycleListenerRegistered = true; } } public static Map<BindingBase,
+     * RMHandler> getHandlerMap() { if (null == handlers) { handlers = new
+     * HashMap<BindingBase, RMHandler>(); } return handlers; } public void
+     * close(MessageContext context) { // TODO commit transaction } public
+     * boolean handleFault(LogicalMessageContext context) { return
+     * handle(context); } public boolean handleMessage(LogicalMessageContext
+     * context) { return handle(context); } public PersistenceManager
+     * getPersistenceManager() { return persistenceManager; } public void
+     * setPersistenceManager(PersistenceManager pm) { persistenceManager = pm; }
+     * public RMStore getStore() { if (null != persistenceManager) { return
+     * persistenceManager.getStore(); } return null; } public Timer getTimer() {
+     * return timer; } public Bus getBus() { return bus; } public Transport
+     * getTransport() { return null == clientTransport ? serverTransport :
+     * clientTransport; } public ClientTransport getClientTransport() { return
+     * clientTransport; } public ServerTransport getServerTransport() { return
+     * serverTransport; } public ClientBinding getClientBinding() { return
+     * clientBinding; } public ServerBinding getServerBinding() { return
+     * serverBinding; } public boolean isServerSide() { return null !=
+     * serverBinding; } public AbstractBindingBase getBinding() { if (null !=
+     * clientBinding) { return (AbstractBindingBase)clientBinding; } return
+     * (AbstractBindingBase)serverBinding; } public RMProxy getProxy() { return
+     * proxy; } public RMServant getServant() { return servant; } public
+     * RMSource getSource() { return source; } public RMDestination
+     * getDestination() { return destination; } protected void
+     * open(LogicalMessageContext context) { // TODO begin transaction }
+     * protected boolean handle(LogicalMessageContext context) { try { if
+     * (ContextUtils.isOutbound(context)) { handleOutbound(context); } else {
+     * handleInbound(context); } } catch (SequenceFault sf) {
+     * LOG.log(Level.SEVERE, "SequenceFault", sf); } return true; } protected
+     * void handleOutbound(LogicalMessageContext context) throws SequenceFault {
+     * LOG.entering(getClass().getName(), "handleOutbound");
+     * AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context, false,
+     * true); // ensure the appropriate version of WS-Addressing is used
+     * maps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME); String
+     * action = null; if (maps != null && null != maps.getAction()) { action =
+     * maps.getAction().getValue(); } // nothing to do if this is a
+     * CreateSequence, TerminateSequence or // SequenceInfo request if
+     * (LOG.isLoggable(Level.FINE)) { LOG.fine("Action: " + action); } boolean
+     * isApplicationMessage = true; if
+     * (RMUtils.getRMConstants().getCreateSequenceAction().equals(action) ||
+     * RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action) ||
+     * RMUtils.getRMConstants().getTerminateSequenceAction().equals(action) ||
+     * RMUtils.getRMConstants().getLastMessageAction().equals(action) ||
+     * RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action) ||
+     * RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) {
+     * isApplicationMessage = false; } RMPropertiesImpl rmpsOut =
+     * (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, true); if
+     * (null == rmpsOut) { rmpsOut = new RMPropertiesImpl();
+     * RMContextUtils.storeRMProperties(context, rmpsOut, true); }
+     * RMPropertiesImpl rmpsIn = null; Identifier inSeqId = null; BigInteger
+     * inMessageNumber = null; if (isApplicationMessage) { rmpsIn =
+     * (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, false); if
+     * (null != rmpsIn && null != rmpsIn.getSequence()) { inSeqId =
+     * rmpsIn.getSequence().getIdentifier(); inMessageNumber =
+     * rmpsIn.getSequence().getMessageNumber(); } if
+     * (LOG.isLoggable(Level.FINE)) { LOG.fine("inbound sequence: " + (null ==
+     * inSeqId ? "null" : inSeqId.getValue())); } // not for partial responses
+     * to oneway requests if (!(isServerSide() &&
+     * BindingContextUtils.isOnewayTransport(context))) { if
+     * (!ContextUtils.isRequestor(context)) { assert null != inSeqId; } // get
+     * the current sequence, requesting the creation of a new one if necessary
+     * SourceSequence seq = getSequence(inSeqId, context, maps); assert null !=
+     * seq; // increase message number and store a sequence type object in //
+     * context seq.nextMessageNumber(inSeqId, inMessageNumber);
+     * rmpsOut.setSequence(seq); // if this was the last message in the
+     * sequence, reset the // current sequence so that a new one will be created
+     * next // time the handler is invoked if (seq.isLastMessage()) {
+     * source.setCurrent(null); } } } // add Acknowledgements (to application
+     * messages or explicitly // created Acknowledgement messages only) if
+     * (isApplicationMessage ||
+     * RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)) {
+     * AttributedURI to = VersionTransformer.convert(maps.getTo()); assert null !=
+     * to; addAcknowledgements(rmpsOut, inSeqId, to); } // indicate to the
+     * binding that a response is expected from the transport although // the
+     * web method is a oneway method if
+     * (BindingContextUtils.isOnewayMethod(context) ||
+     * RMUtils.getRMConstants().getLastMessageAction().equals(action)) {
+     * context.put(OutputStreamMessageContext.ONEWAY_MESSAGE_TF, Boolean.FALSE); } }
+     * protected void handleInbound(LogicalMessageContext context) throws
+     * SequenceFault { LOG.entering(getClass().getName(), "handleInbound");
+     * RMProperties rmps = RMContextUtils.retrieveRMProperties(context, false);
+     * final AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context,
+     * false, false); assert null != maps; String action = null; if (null !=
+     * maps.getAction()) { action = maps.getAction().getValue(); } if
+     * (LOG.isLoggable(Level.FINE)) { LOG.fine("Action: " + action); } if
+     * (RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)) {
+     * Object[] parameters =
+     * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
+     * CreateSequenceResponseType csr =
+     * (CreateSequenceResponseType)parameters[0];
+     * getServant().createSequenceResponse(getSource(), csr,
+     * getProxy().getOfferedIdentifier()); return; } else if
+     * (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)) {
+     * Object[] parameters =
+     * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
+     * CreateSequenceType cs = (CreateSequenceType)parameters[0]; final
+     * CreateSequenceResponseType csr =
+     * getServant().createSequence(getDestination(), cs, maps); Runnable
+     * response = new Runnable() { public void run() { try {
+     * getProxy().createSequenceResponse(maps, csr); } catch (IOException ex) {
+     * ex.printStackTrace(); } catch (SequenceFault sf) { sf.printStackTrace(); } } };
+     * getBinding().getBus().getWorkQueueManager().getAutomaticWorkQueue().execute(response);
+     * return; } else if
+     * (RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)) {
+     * Object[] parameters =
+     * (Object[])context.get(ObjectMessageContext.METHOD_PARAMETERS);
+     * TerminateSequenceType cs = (TerminateSequenceType)parameters[0];
+     * getServant().terminateSequence(getDestination(), cs.getIdentifier()); } //
+     * for application AND out of band messages if (null != rmps) {
+     * processAcknowledgments(rmps); processAcknowledgmentRequests(rmps);
+     * processSequence(rmps, maps); processDeliveryAssurance(rmps); } } void
+     * processAcknowledgments(RMProperties rmps) { Collection<SequenceAcknowledgement>
+     * acks = rmps.getAcks(); if (null != acks) { for (SequenceAcknowledgement
+     * ack : acks) { getSource().setAcknowledged(ack); } } } void
+     * processSequence(RMProperties rmps, AddressingProperties maps) throws
+     * SequenceFault { SequenceType s = rmps.getSequence(); if (null == s) {
+     * return; } getDestination().acknowledge(s, null == maps.getReplyTo() ?
+     * null : maps.getReplyTo().getAddress().getValue()); } void
+     * processAcknowledgmentRequests(RMProperties rmps) { Collection<AckRequestedType>
+     * requested = rmps.getAcksRequested(); if (null != requested) { for
+     * (AckRequestedType ar : requested) { DestinationSequence seq =
+     * getDestination().getSequence(ar.getIdentifier()); if (null != seq) {
+     * seq.scheduleImmediateAcknowledgement(); } else { LOG.severe("No such
+     * sequence."); } } } } boolean processDeliveryAssurance(RMProperties rmps) {
+     * SequenceType s = rmps.getSequence(); if (null == s) { return true; }
+     * DestinationSequence ds = destination.getSequence(s.getIdentifier());
+     * return ds.applyDeliveryAssurance(s.getMessageNumber()); }
+     */
+
+    /*
+   
+    protected void setInitialised(ConfigurationHelper ch,
+                                  RMSource s,
+                                  RMDestination d,
+                                  Timer t,
+                                  boolean registered
+                                  ) {
+        configurationHelper = ch;
+        source = s;
+        destination = d;
+        timer = t;
+        busLifeCycleListenerRegistered = registered;
+        initialise();
+    }
+    */
+    
+    
+    
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java?view=auto&rev=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java Wed Nov  1 09:36:06 2006
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.math.BigInteger;
+
+import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
+
+
+public abstract class AbstractSequence {
+    
+    protected final Identifier id;
+    protected SequenceAcknowledgement acknowledgement;
+    
+    protected AbstractSequence(Identifier i) {
+        id = i;
+    }
+    
+    /**
+     * @return the sequence identifier
+     */
+    public Identifier getIdentifier() {
+        return id;
+    }
+    
+    public String toString() {
+        return id.getValue();
+    }
+    
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;            
+        }
+        if (other instanceof AbstractSequence) {
+            AbstractSequence otherSeq = (AbstractSequence)other;
+            return otherSeq.getIdentifier().getValue().equals(getIdentifier().getValue());
+        }        
+        return false;
+    }
+    
+    public int hashCode() {
+        return getIdentifier().getValue().hashCode();
+    }
+    
+    public static boolean identifierEquals(Identifier id1, Identifier id2) {
+        if (null == id1) {
+            return null == id2;
+        } else {
+            return null != id2 && id1.getValue().equals(id2.getValue());
+        }
+    }
+    
+    public synchronized boolean isAcknowledged(BigInteger m) {
+        for (AcknowledgementRange r : acknowledgement.getAcknowledgementRange()) {
+            if (m.subtract(r.getLower()).signum() >= 0 && r.getUpper().subtract(m).signum() >= 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+   
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractSequence.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?view=diff&rev=470005&p1=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java&r1=469824&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed Nov  1 09:36:06 2006
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.cxf.ws.rm.impl;
+package org.apache.cxf.ws.rm;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -29,10 +29,6 @@
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.ws.rm.DestinationSequence;
-import org.apache.cxf.ws.rm.Identifier;
-import org.apache.cxf.ws.rm.SequenceFault;
-import org.apache.cxf.ws.rm.SequenceType;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 
 
@@ -40,26 +36,30 @@
 
     private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
     
-    private Map<String, DestinationSequenceImpl> map;
+    private Map<String, DestinationSequence> map;
     
     Destination(RMEndpoint reliableEndpoint) {
         super(reliableEndpoint);
-        map = new HashMap<String, DestinationSequenceImpl>();    
+        map = new HashMap<String, DestinationSequence>();    
     }  
     
     public DestinationSequence getSequence(Identifier id) {        
         return map.get(id.getValue());
     }
     
-    public void addSequence(DestinationSequenceImpl seq) {
+    public Collection<DestinationSequence> getAllSequences() {  
+        return CastUtils.cast(map.values());
+    }
+    
+    public void addSequence(DestinationSequence seq) {
         addSequence(seq, true);
     }
     
-    public void addSequence(DestinationSequenceImpl seq, boolean persist) {  
-        // seq.setDestination(this);
+    public void addSequence(DestinationSequence seq, boolean persist) {  
+        seq.setDestination(this);
         map.put(seq.getIdentifier().getValue(), seq);
         if (persist) {
-            RMStore store = getInterceptor().getStore();
+            RMStore store = getReliableEndpoint().getManager().getStore();
             if (null != store) {
                 store.createDestinationSequence(seq);
             }
@@ -68,7 +68,7 @@
     
     public void removeSequence(DestinationSequence seq) {        
         map.remove(seq.getIdentifier().getValue());
-        RMStore store = getInterceptor().getStore();
+        RMStore store = getReliableEndpoint().getManager().getStore();
         if (null != store) {
             store.removeDestinationSequence(seq.getIdentifier());
         }
@@ -86,7 +86,7 @@
     */
     public void acknowledge(SequenceType sequenceType, String replyToAddress) 
         throws SequenceFault {
-        DestinationSequenceImpl seq = getSequenceImpl(sequenceType.getIdentifier());
+        DestinationSequence seq = getSequence(sequenceType.getIdentifier());
         if (null != seq) {
             seq.acknowledge(sequenceType.getMessageNumber());
             
@@ -102,7 +102,7 @@
                 if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress)
                     || seq.canPiggybackAckOnPartialResponse())) {
                     try {
-                        getProxy().acknowledge(seq);
+                        getReliableEndpoint().getProxy().acknowledge(seq);
                     } catch (IOException ex) {
                         Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, seq);
                         LOG.log(Level.SEVERE, msg.toString(), ex);
@@ -115,11 +115,5 @@
         }
     }
     
-    Collection<DestinationSequenceImpl> getAllSequences() {  
-        return CastUtils.cast(map.values());
-    }
     
-    DestinationSequenceImpl getSequenceImpl(Identifier sid) {
-        return map.get(sid.getValue());
-    }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=470005&r1=470004&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Wed Nov  1 09:36:06 2006
@@ -19,42 +19,295 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
+import org.apache.cxf.ws.rm.manager.AcksPolicyType;
+import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.RMAssertion;
 
-public interface DestinationSequence {
+public class DestinationSequence extends AbstractSequence {
     
-    /**
-     * @return the sequence identifier
-     */
-    Identifier getIdentifier();
+    private static final Logger LOG = LogUtils.getL7dLogger(DestinationSequence.class);
+
+    private Destination destination;
+    private EndpointReferenceType acksTo;
+    private BigInteger lastMessageNumber;
+    private SequenceMonitor monitor;
+    private boolean acknowledgeOnNextOccasion;
+    private List<DeferredAcknowledgment> deferredAcknowledgments;
+    private String correlationID;
+    
+    public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
+        this(i, a, null, null);
+        setDestination(d);
+    }
+    
+    public DestinationSequence(Identifier i, EndpointReferenceType a,
+                              BigInteger lmn, SequenceAcknowledgement ac) {
+        super(i);
+        acksTo = a;
+        lastMessageNumber = lmn;
+        acknowledgement = ac;
+        if (null == acknowledgement) {
+            acknowledgement = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
+            acknowledgement.setIdentifier(id);
+        }
+        monitor = new SequenceMonitor();
+    }
     
     /**
      * @return the acksTo address for the sequence
      */
-    EndpointReferenceType getAcksTo();
+    public EndpointReferenceType getAcksTo() {
+        return acksTo;
+    }
     
     /**
      * @return the message number of the last message or null if the last message had not been received.
      */
-    BigInteger getLastMessageNumber();
+    public BigInteger getLastMessageNumber() {
+        return lastMessageNumber;
+    }
     
     /**
      * @return the sequence acknowledgement presenting the sequences thus far received by a destination 
      */
-    SequenceAcknowledgement getAcknowledgment();
+    public SequenceAcknowledgement getAcknowledgment() {
+        return acknowledgement;
+    }
     
     /**
      * @return the sequence acknowledgement presenting the sequences thus far received by a destination
      * as an input stream 
      */
-    InputStream getAcknowledgmentAsStream();
+    InputStream getAcknowledgmentAsStream() {
+        return null;
+    }
     
     /**
      * @return the identifier of the rm destination
      */
-    String getEndpointIdentifier(); 
+    public String getEndpointIdentifier() {
+        return destination.getName().toString();
+    }
+    
+    public void acknowledge(BigInteger messageNumber) throws SequenceFault {
+        if (null != lastMessageNumber && messageNumber.compareTo(lastMessageNumber) > 0) {
+            SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
+            sf.setFaultCode(RMConstants.getLastMessageNumberExceededFaultCode());
+            Message msg = new Message("LAST_MESSAGE_NUMBER_EXCEEDED_EXC", LOG, this);
+            throw new SequenceFault(msg.toString(), sf);
+        }
+        
+        monitor.acknowledgeMessage();
+        
+        synchronized (this) {
+            boolean done = false;
+            int i = 0;
+            for (; i < acknowledgement.getAcknowledgementRange().size(); i++) {
+                AcknowledgementRange r = acknowledgement.getAcknowledgementRange().get(i);
+                if (r.getLower().compareTo(messageNumber) <= 0 
+                    && r.getUpper().compareTo(messageNumber) >= 0) {
+                    done = true;
+                    break;
+                } else {
+                    BigInteger diff = r.getLower().subtract(messageNumber);
+                    if (diff.signum() == 1) {
+                        if (diff.equals(BigInteger.ONE)) {
+                            r.setLower(messageNumber);
+                            done = true;
+                        }
+                        break;
+                    } else if (messageNumber.subtract(r.getUpper()).equals(BigInteger.ONE)) {
+                        r.setUpper(messageNumber);
+                        done = true;
+                        break;
+                    }
+                }
+            }
+
+            if (!done) {
+                AcknowledgementRange range = RMUtils.getWSRMFactory()
+                    .createSequenceAcknowledgementAcknowledgementRange();
+                range.setLower(messageNumber);
+                range.setUpper(messageNumber);
+                acknowledgement.getAcknowledgementRange().add(i, range);
+            }
+            
+            notifyAll();
+        }
+        
+        purgeAcknowledged(messageNumber);
+        
+        scheduleAcknowledgement();
+        
+    }
+    
+    final void setDestination(Destination d) {
+        destination = d;
+    }
+    
+    Destination getDestination() {
+        return destination;
+    }
+    
+    /**
+     * Returns the monitor for this sequence.
+     * 
+     * @return the sequence monitor.
+     */
+    SequenceMonitor getMonitor() {
+        return monitor;
+    }
+    
+    void setLastMessageNumber(BigInteger lmn) {
+        lastMessageNumber = lmn;
+    }
+      
+    boolean canPiggybackAckOnPartialResponse() {
+        // TODO: should also check if we allow breaking the WI Profile rule by which no headers
+        // can be included in a HTTP response
+        return getAcksTo().getAddress().getValue().equals(RMConstants.WSA_ANONYMOUS_ADDRESS);
+    }
+       
+    /**
+     * Ensures that the delivery assurance is honored, e.g. by throwing an 
+     * exception if the message had already been delivered and the delivery
+     * assurance is AtMostOnce.
+     * This method blocks in case the delivery assurance is 
+     * InOrder and and not all messages with lower message numbers have been 
+     * delivered.
+     * 
+     * @param s the SequenceType object including identifier and message number
+     */
+    boolean applyDeliveryAssurance(BigInteger mn) {
+        DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
+        if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
+            Message msg = new Message("MESSAGE_ALREADY_DELIVERED", LOG, mn, getIdentifier().getValue());
+            LOG.log(Level.SEVERE, msg.toString());
+            return false;
+        } 
+        if (da.isSetInOrder() && da.isSetAtLeastOnce()) {
+            synchronized (this) {
+                boolean ok = allPredecessorsAcknowledged(mn);
+                while (!ok) {
+                    try {
+                        wait();                        
+                        ok = allPredecessorsAcknowledged(mn);
+                    } catch (InterruptedException ie) {
+                        // ignore
+                    }
+                }
+            }
+        }
+        return true;
+    }
+    
+    synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
+        return acknowledgement.getAcknowledgementRange().size() == 1
+            && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
+            && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+    }
+    
+    void purgeAcknowledged(BigInteger messageNr) {
+        RMStore store = destination.getManager().getStore();
+        if (null == store) {
+            return;
+        }
+        Collection<BigInteger> messageNrs = new ArrayList<BigInteger>();
+        messageNrs.add(messageNr);
+        store.removeMessages(getIdentifier(), messageNrs, false);
+    }
+
+    /**
+     * Called after an acknowledgement header for this sequence has been added to an outgoing message.
+     */
+    void acknowledgmentSent() {
+        acknowledgeOnNextOccasion = false;
+    }
+
+    public boolean sendAcknowledgement() {
+        return acknowledgeOnNextOccasion;
+    }
+    
+    /**
+     * The correlation of the incoming CreateSequence call used to create this
+     * sequence is recorded so that in the absence of an offer, the corresponding
+     * outgoing CreateSeqeunce can be correlated.
+     */
+    void setCorrelationID(String cid) {
+        correlationID = cid;
+    }
+   
+    String getCorrelationID() {
+        return correlationID;
+    }
+
+    void scheduleAcknowledgement() {          
+        RMAssertion rma = destination.getManager().getRMAssertion();
+        int delay = 0;
+        if (null != rma.getAcknowledgementInterval()) {
+            delay = rma.getAcknowledgementInterval().getMilliseconds().intValue();
+        }
+        AcksPolicyType ap = destination.getManager().getDestinationPolicy().getAcksPolicy();
+ 
+        if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
+            scheduleDeferredAcknowledgement(delay);
+        } else {
+            scheduleImmediateAcknowledgement();
+        }
+    }
+
+
+    void scheduleImmediateAcknowledgement() {
+        acknowledgeOnNextOccasion = true;
+    }
+
+    synchronized void scheduleDeferredAcknowledgement(int delay) {
+        if (null == deferredAcknowledgments) {
+            deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
+        }
+        long now = System.currentTimeMillis();
+        long expectedExecutionTime = now + delay;
+        for (DeferredAcknowledgment da : deferredAcknowledgments) {
+            if (da.scheduledExecutionTime() <= expectedExecutionTime) {
+                return;
+            }
+        }
+        DeferredAcknowledgment da = new DeferredAcknowledgment();
+        deferredAcknowledgments.add(da);
+        destination.getManager().getTimer().schedule(da, delay);
+    }
+
+    final class DeferredAcknowledgment extends TimerTask {
+
+        public void run() {
+            DestinationSequence.this.scheduleImmediateAcknowledgement();
+
+            try {                
+                RMEndpoint rme = destination.getReliableEndpoint();
+                Proxy proxy = rme.getProxy();
+                proxy.acknowledge(DestinationSequence.this);
+            } catch (IOException ex) {
+                Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequence.this);
+                LOG.log(Level.SEVERE, msg.toString(), ex);
+            }
+
+        }
+    }
+    
     
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=470005&p1=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java&r1=469824&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Wed Nov  1 09:36:06 2006
@@ -17,78 +17,37 @@
  * under the License.
  */
 
-package org.apache.cxf.ws.rm.impl;
+package org.apache.cxf.ws.rm;
 
 import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.xml.bind.JAXBException;
 import javax.xml.datatype.Duration;
-import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
-import org.apache.cxf.binding.soap.model.SoapBindingInfo;
-import org.apache.cxf.binding.soap.model.SoapOperationInfo;
-import org.apache.cxf.databinding.DataBinding;
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.ClientImpl;
 import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.endpoint.EndpointException;
-import org.apache.cxf.endpoint.EndpointImpl;
-import org.apache.cxf.jaxb.JAXBDataBinding;
-import org.apache.cxf.service.Service;
-import org.apache.cxf.service.ServiceImpl;
-import org.apache.cxf.service.factory.ServiceConstructionException;
 import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.BindingOperationInfo;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.service.model.InterfaceInfo;
-import org.apache.cxf.service.model.MessageInfo;
-import org.apache.cxf.service.model.MessagePartInfo;
 import org.apache.cxf.service.model.OperationInfo;
-import org.apache.cxf.service.model.ServiceInfo;
-import org.apache.cxf.service.model.UnwrappedOperationInfo;
 import org.apache.cxf.ws.addressing.RelatesToType;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
-import org.apache.cxf.ws.rm.CreateSequenceResponseType;
-import org.apache.cxf.ws.rm.CreateSequenceType;
-import org.apache.cxf.ws.rm.DestinationSequence;
-import org.apache.cxf.ws.rm.Expires;
-import org.apache.cxf.ws.rm.Identifier;
-import org.apache.cxf.ws.rm.OfferType;
-import org.apache.cxf.ws.rm.RMConstants;
-import org.apache.cxf.ws.rm.SequenceFaultType;
-import org.apache.cxf.ws.rm.TerminateSequenceType;
-import org.apache.cxf.ws.rm.interceptor.SourcePolicyType;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
 
 /**
  * 
  */
 public class Proxy {
 
-    static final QName SERVICE_NAME = 
-        new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractService");
-    static final QName INTERFACE_NAME = 
-         new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractPortType");
-    static final QName BINDING_NAME = 
-        new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractSoapBinding");
-    static final QName PORT_NAME = 
-        new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractSoapPort");
-
     private static final Logger LOG = Logger.getLogger(Proxy.class.getName());
 
     private RMEndpoint reliableEndpoint;
-    private Service service;
-    private BindingInfo bindingInfo;
-    private Endpoint endpoint;
-    private Bus bus;
     
-    // REVISIT assumption there is only a single outstanding offer
-    private Identifier offeredIdentifier;
+    
 
-    Proxy(Bus b, RMEndpoint rme) {
-        bus = b;
+    public Proxy(RMEndpoint rme) {
         reliableEndpoint = rme;
     }
 
@@ -96,23 +55,15 @@
         return reliableEndpoint;
     }
 
-    Source getSource() {
-        return reliableEndpoint.getSource();
-    }
-
-    Service getService() {
-        return service;
-    }
-
     void acknowledge(DestinationSequence ds) throws IOException {
 
     }
 
-    void createSequence(org.apache.cxf.ws.addressing.EndpointReferenceType to, 
+    public void createSequence(org.apache.cxf.ws.addressing.EndpointReferenceType to, 
                         EndpointReferenceType defaultAcksTo,
                         RelatesToType relatesTo) throws IOException {
         
-        SourcePolicyType sp = reliableEndpoint.getInterceptor().getSourcePolicy();
+        SourcePolicyType sp = reliableEndpoint.getManager().getSourcePolicy();
         CreateSequenceType create = RMUtils.getWSRMFactory().createCreateSequenceType();
 
         String address = sp.getAcksTo();
@@ -143,145 +94,15 @@
             create.setOffer(offer);
         }
         
-        OperationInfo oi = service.getServiceInfo().getInterface()
+        OperationInfo oi = reliableEndpoint.getService().getServiceInfo().getInterface()
             .getOperation(RMConstants.getCreateSequenceOperationName());
         invokeOneway(oi, new Object[] {create});
     }
     
-    void lastMessage(SourceSequenceImpl s) throws IOException {
-        // TODO
-    }
-    
-    void createService() {
-        ServiceInfo si = new ServiceInfo();
-        si.setName(SERVICE_NAME);
-        buildTypeInfo(si);
-        buildInterfaceInfo(si);
-        buildBindingInfo(si);
-        service = new ServiceImpl(si);
-        
-        DataBinding dataBinding = null;
-        try {
-            dataBinding = new JAXBDataBinding(CreateSequenceType.class,
-                                              CreateSequenceResponseType.class,
-                                              TerminateSequenceType.class,
-                                              SequenceFaultType.class);
-        } catch (JAXBException e) {
-            throw new ServiceConstructionException(e);
-        }
-        service.setDataBinding(dataBinding);
-    }
-    
-    void initialise() {
-        createService();
-        createEndpoint();
-    }
-
-    void createEndpoint() {
-        ServiceInfo si = service.getServiceInfo();
-        String transportId = reliableEndpoint.getEndpoint().getEndpointInfo().getTransportId();
-        EndpointInfo ei = new EndpointInfo(si, transportId);
-        ei.setAddress(reliableEndpoint.getEndpoint().getEndpointInfo().getAddress());
-        ei.setName(PORT_NAME);
-        ei.setBinding(bindingInfo);
-        si.addEndpoint(ei);
-    
-        try {
-            endpoint = new EndpointImpl(bus, service, ei);
-        } catch (EndpointException ex) {
-            ex.printStackTrace();
-        }
-    }
-    
-    void buildTypeInfo(ServiceInfo si) {
+    void lastMessage(SourceSequence s) throws IOException {
         // TODO
     }
-
-    void buildInterfaceInfo(ServiceInfo si) {
-        InterfaceInfo ii = new InterfaceInfo(si, INTERFACE_NAME);
-        buildOperationInfo(ii);
-    }
-
-    void buildOperationInfo(InterfaceInfo ii) {
-        OperationInfo oi = null;
-        MessagePartInfo pi = null;
-        OperationInfo unwrapped = null;
-        MessageInfo mi = null;
-        MessageInfo unwrappedInput = null;
-
-        oi = ii.addOperation(RMConstants.getCreateSequenceOperationName());
-        mi = oi.createMessage(RMConstants.getCreateSequenceOperationName());
-        oi.setInput(mi.getName().getLocalPart(), mi);
-        pi = mi.addMessagePart("create");
-        pi.setElementQName(RMConstants.getCreateSequenceOperationName());
-        pi.setElement(true);
-        // pi.setXmlSchema(null);
-        unwrappedInput = new MessageInfo(oi, mi.getName());
-        unwrapped = new UnwrappedOperationInfo(oi);
-        oi.setUnwrappedOperation(unwrapped);
-        unwrapped.setInput(oi.getInputName(), unwrappedInput);
-
-        oi = ii.addOperation(RMConstants.getCreateSequenceResponseOperationName());
-        mi = oi.createMessage(RMConstants.getCreateSequenceResponseOperationName());
-        oi.setInput(mi.getName().getLocalPart(), mi);
-        pi = mi.addMessagePart("createResponse");
-        pi.setElementQName(RMConstants.getCreateSequenceResponseOperationName());
-        pi.setElement(true);
-        // pi.setXmlSchema(null);
-        unwrappedInput = new MessageInfo(oi, mi.getName());
-        unwrapped = new UnwrappedOperationInfo(oi);
-        oi.setUnwrappedOperation(unwrapped);
-        unwrapped.setInput(oi.getInputName(), unwrappedInput);
-
-        oi = ii.addOperation(RMConstants.getTerminateSequenceOperationName());
-        mi = oi.createMessage(RMConstants.getTerminateSequenceOperationName());
-        oi.setInput(mi.getName().getLocalPart(), mi);
-        pi = mi.addMessagePart("createResponse");
-        pi.setElementQName(RMConstants.getTerminateSequenceOperationName());
-        pi.setElement(true);
-        // pi.setXmlSchema(null);
-        unwrappedInput = new MessageInfo(oi, mi.getName());
-        unwrapped = new UnwrappedOperationInfo(oi);
-        oi.setUnwrappedOperation(unwrapped);
-        unwrapped.setInput(oi.getInputName(), unwrappedInput);
-        
-    }
-
-    void buildBindingInfo(ServiceInfo si) {
-        // use same binding id as for application endpoint
-        if (null != reliableEndpoint) {
-            String bindingId = reliableEndpoint.getEndpoint().getEndpointInfo().getBinding().getBindingId();
-            SoapBindingInfo bi = new SoapBindingInfo(si, bindingId);
-            bi.setName(BINDING_NAME);
-            BindingOperationInfo boi = null;
-            SoapOperationInfo soi = null;
-
-            boi = bi.buildOperation(RMConstants.getCreateSequenceOperationName(), 
-                RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
-            soi = new SoapOperationInfo();
-            soi.setAction(RMConstants.getCreateSequenceAction());
-            boi.addExtensor(soi);
-            bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOperationName(), 
-                RMConstants.getCreateSequenceResponseOperationName().getLocalPart(), null);
-            soi = new SoapOperationInfo();
-            soi.setAction(RMConstants.getCreateSequenceResponseAction());
-            boi.addExtensor(soi);
-            bi.addOperation(boi);
-
-            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), 
-                RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
-            soi = new SoapOperationInfo();
-            soi.setAction(RMConstants.getTerminateSequenceAction());
-            boi.addExtensor(soi);
-            bi.addOperation(boi);
-
-            si.addBinding(bi);
-            bindingInfo = bi;
-        }
-    }
-
+       
     void invokeOneway(OperationInfo oi, Object[] params) {
         LOG.log(Level.INFO, "Invoking out-of-band RM protocol message {0}.", 
                 oi == null ? null : oi.getName());
@@ -289,9 +110,14 @@
         
         // assuming we are on the client side
         
+        RMManager manager = reliableEndpoint.getManager();
+        Bus bus = manager.getBus();
+        Endpoint endpoint = reliableEndpoint.getEndpoint();
+        BindingInfo bi = reliableEndpoint.getBindingInfo();
+        
                 
         Client client = new ClientImpl(bus, endpoint);
-        BindingOperationInfo boi = bindingInfo.getOperation(oi);
+        BindingOperationInfo boi = bi.getOperation(oi);
         try {
             client.invoke(boi, params, null);
         } catch (Exception ex) {
@@ -299,13 +125,5 @@
         }
     }
     
-    protected Identifier getOfferedIdentifier() {
-        return offeredIdentifier;    
-    }
-    
-    protected void setOfferedIdentifier(OfferType offer) { 
-        if (offer != null) {
-            offeredIdentifier = offer.getIdentifier();
-        }
-    }
+
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=470005&r1=470004&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Wed Nov  1 09:36:06 2006
@@ -19,7 +19,11 @@
 
 package org.apache.cxf.ws.rm;
 
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.VersionTransformer;
 
 /**
  * Holder for utility methods relating to contexts.
@@ -27,10 +31,21 @@
 
 public final class RMContextUtils {
 
+    /** 
+     * Prevents instantiation.
+     */
     protected RMContextUtils() {
     }
     
     /**
+     * @return a generated UUID
+     */
+    public static String generateUUID() {
+        return org.apache.cxf.ws.addressing.ContextUtils.generateUUID();
+    }
+
+    
+    /**
      * Determine if message is outbound.
      * 
      * @param message the current Message
@@ -40,6 +55,28 @@
         return org.apache.cxf.ws.addressing.ContextUtils.isOutbound(message);
     }
     
+
+    /**
+     * Determine if current messaging role is that of requestor.
+     * 
+     * @param message the current Message
+     * @return true iff the current messaging role is that of requestor
+     */
+    public static boolean isRequestor(Message message) {
+        return org.apache.cxf.ws.addressing.ContextUtils.isRequestor(message);
+    }
+    
+    /**
+     * Determine if message is currently being processed on server side.
+     * 
+     * @param message the current Message
+     * @return true iff message is currently being processed on server side
+     */
+    public static boolean isServerSide(Message message) {
+        // TODO
+        return false;
+    }
+    
     /**
      * Retrieve the RM properties from the current message.
      * @param message the current message
@@ -61,11 +98,48 @@
         message.put(key, rmps);
     }
     
+    /**
+     * Retrieves the addressing properties from the current message.
+     * 
+     * @param message the current message
+     * @param isProviderContext true if the binding provider request context
+     *            available to the client application as opposed to the message
+     *            context visible to handlers
+     * @param isOutbound true iff the message is outbound
+     * @return the current addressing properties
+     */
+    public static AddressingPropertiesImpl retrieveMAPs(Message message, boolean isProviderContext,
+                                                    boolean isOutbound) {
+        return org.apache.cxf.ws.addressing.ContextUtils.retrieveMAPs(message, isProviderContext, isOutbound);
+    }
+
+    /**
+     * Ensures the appropriate version of WS-Addressing is used.
+     * 
+     * @param maps the addressing properties
+     */
+    public static void ensureExposedVersion(AddressingProperties maps) {
+        ((AddressingPropertiesImpl)maps).exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
+    }
+
+    /**
+     * Returns the endpoint of this message, i.e. the client endpoint if the
+     * current messaging role is that of requestor, or the server endpoint
+     * otherwise.
+     * 
+     * @param message the current Message
+     * @return the endpoint
+     */
+    public static Endpoint getEndpoint(Message message) {
+        if (isRequestor(message)) {
+            return message.getExchange().get(Endpoint.class);
+        } else {
+            return message.getExchange().get(Endpoint.class);
+        }
+    }
+    
     private static String getRMPropertiesKey(boolean outbound) {
         return outbound ? RMMessageConstants.RM_PROPERTIES_OUTBOUND 
             : RMMessageConstants.RM_PROPERTIES_INBOUND;
     }
-    
-    
-    
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (from r469824, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=470005&p1=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java&r1=469824&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java&r2=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Wed Nov  1 09:36:06 2006
@@ -17,89 +17,119 @@
  * under the License.
  */
 
-package org.apache.cxf.ws.rm.impl;
+package org.apache.cxf.ws.rm;
 
+import javax.xml.bind.JAXBException;
 import javax.xml.namespace.QName;
 
+import org.apache.cxf.binding.soap.model.SoapBindingInfo;
+import org.apache.cxf.binding.soap.model.SoapOperationInfo;
+import org.apache.cxf.databinding.DataBinding;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.EndpointException;
+import org.apache.cxf.endpoint.EndpointImpl;
+import org.apache.cxf.jaxb.JAXBDataBinding;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.ServiceImpl;
+import org.apache.cxf.service.factory.ServiceConstructionException;
+import org.apache.cxf.service.model.BindingInfo;
+import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.MessageInfo;
+import org.apache.cxf.service.model.MessagePartInfo;
+import org.apache.cxf.service.model.OperationInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+import org.apache.cxf.service.model.UnwrappedOperationInfo;
 
 public class RMEndpoint {
     
-    private final RMInterceptor interceptor;
-    private final Endpoint endpoint;
+    private static final QName SERVICE_NAME = 
+        new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractService");
+    private static final QName INTERFACE_NAME = 
+         new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractPortType");
+    private static final QName BINDING_NAME = 
+        new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractSoapBinding");
+    private static final QName PORT_NAME = 
+        new QName(RMConstants.WSRM_NAMESPACE_NAME, "SequenceAbstractSoapPort");
+        
+    private final RMManager manager;
+    private final Endpoint applicationEndpoint;
     private Source source;
     private Destination destination;
+    private Service service;
+    private Endpoint endpoint;
+    // REVISIT assumption there is only a single outstanding offer
+    private Identifier offeredIdentifier;
     private Proxy proxy;
-    private Servant servant;
     
-    public RMEndpoint(RMInterceptor i, Endpoint e) {
-        interceptor = i;
-        endpoint = e;
+    
+    public RMEndpoint(RMManager m, Endpoint ae) {
+        manager = m;
+        applicationEndpoint = ae;
         source = new Source(this);
         destination = new Destination(this);
-        proxy = new Proxy(interceptor.getBus(), this);
-        proxy.initialise();
-        servant = new Servant(this);
+        proxy = new Proxy(this);
     }
     
     public QName getName() {
-        return endpoint.getEndpointInfo().getName();
+        return applicationEndpoint.getEndpointInfo().getName();
     }
-      
+    
     /**
-     * @return Returns the endpoint.
+     * @return Returns the bus.
      */
-    public Endpoint getEndpoint() {
-        return endpoint;
+    public RMManager getManager() {
+        return manager;
     }
-
+      
     /**
-     * @return Returns the interceptor.
+     * @return Returns the application endpoint.
      */
-    public RMInterceptor getInterceptor() {
-        return interceptor;
-    }
-
-    /** 
-     * @return Returns the destination.
-     */
-    public Destination getDestination() {
-        return destination;
+    public Endpoint getApplicationEndpoint() {
+        return applicationEndpoint;
     }
     
     /**
-     * @param destination The destination to set.
+     * @return Returns the RM protocol endpoint.
      */
-    public void setDestination(Destination destination) {
-        this.destination = destination;
+    public Endpoint getEndpoint() {
+        return endpoint;
     }
     
     /**
-     * @return Returns the proxy.
+     * @return Returns the RM protocol service.
      */
-    public Proxy getProxy() {
-        return proxy;
+    public Service getService() {
+        return service;
     }
     
     /**
-     * @param proxy The proxy to set.
+     * @return Returns the RM protocol binding info.
      */
-    public void setProxy(Proxy p) {
-        proxy = p;
+    public BindingInfo getBindingInfo() {
+        return service.getServiceInfo().getBinding(BINDING_NAME);
     }
     
     /**
-     * @return Returns the servant.
+     * @return Returns the proxy.
      */
-    public Servant getServant() {
-        return servant;
+    public Proxy getProxy() {
+        return proxy;
+    }
+
+    /** 
+     * @return Returns the destination.
+     */
+    public Destination getDestination() {
+        return destination;
     }
     
     /**
-     * @param servant The servant to set.
+     * @param destination The destination to set.
      */
-    public void setServant(Servant s) {
-        servant = s;
+    public void setDestination(Destination destination) {
+        this.destination = destination;
     }
     
     /** 
@@ -116,6 +146,139 @@
         this.source = source;
     } 
     
+    void initialise() {
+        createService();
+        createEndpoint();
+    }
+    
+    void createService() {
+        ServiceInfo si = new ServiceInfo();
+        si.setName(SERVICE_NAME);
+        buildInterfaceInfo(si);
+        buildBindingInfo(si);
+        service = new ServiceImpl(si);
+        
+        DataBinding dataBinding = null;
+        try {
+            dataBinding = new JAXBDataBinding(CreateSequenceType.class,
+                                              CreateSequenceResponseType.class,
+                                              TerminateSequenceType.class,
+                                              SequenceFaultType.class);
+        } catch (JAXBException e) {
+            throw new ServiceConstructionException(e);
+        }
+        service.setDataBinding(dataBinding);
+    }
+
+    void createEndpoint() {
+        ServiceInfo si = service.getServiceInfo();
+        buildBindingInfo(si);
+        String transportId = applicationEndpoint.getEndpointInfo().getTransportId();
+        EndpointInfo ei = new EndpointInfo(si, transportId);
+        ei.setAddress(applicationEndpoint.getEndpointInfo().getAddress());
+        ei.setName(PORT_NAME);
+        ei.setBinding(si.getBinding(BINDING_NAME));
+        si.addEndpoint(ei);
+    
+        try {
+            endpoint = new EndpointImpl(manager.getBus(), service, ei);
+        } catch (EndpointException ex) {
+            ex.printStackTrace();
+        }
+    }
+
+    void buildInterfaceInfo(ServiceInfo si) {
+        InterfaceInfo ii = new InterfaceInfo(si, INTERFACE_NAME);
+        buildOperationInfo(ii);
+    }
+
+    void buildOperationInfo(InterfaceInfo ii) {
+        OperationInfo oi = null;
+        MessagePartInfo pi = null;
+        OperationInfo unwrapped = null;
+        MessageInfo mi = null;
+        MessageInfo unwrappedInput = null;
+
+        oi = ii.addOperation(RMConstants.getCreateSequenceOperationName());
+        mi = oi.createMessage(RMConstants.getCreateSequenceOperationName());
+        oi.setInput(mi.getName().getLocalPart(), mi);
+        pi = mi.addMessagePart("create");
+        pi.setElementQName(RMConstants.getCreateSequenceOperationName());
+        pi.setElement(true);
+        // pi.setXmlSchema(null);
+        unwrappedInput = new MessageInfo(oi, mi.getName());
+        unwrapped = new UnwrappedOperationInfo(oi);
+        oi.setUnwrappedOperation(unwrapped);
+        unwrapped.setInput(oi.getInputName(), unwrappedInput);
+
+        oi = ii.addOperation(RMConstants.getCreateSequenceResponseOperationName());
+        mi = oi.createMessage(RMConstants.getCreateSequenceResponseOperationName());
+        oi.setInput(mi.getName().getLocalPart(), mi);
+        pi = mi.addMessagePart("createResponse");
+        pi.setElementQName(RMConstants.getCreateSequenceResponseOperationName());
+        pi.setElement(true);
+        // pi.setXmlSchema(null);
+        unwrappedInput = new MessageInfo(oi, mi.getName());
+        unwrapped = new UnwrappedOperationInfo(oi);
+        oi.setUnwrappedOperation(unwrapped);
+        unwrapped.setInput(oi.getInputName(), unwrappedInput);
+
+        oi = ii.addOperation(RMConstants.getTerminateSequenceOperationName());
+        mi = oi.createMessage(RMConstants.getTerminateSequenceOperationName());
+        oi.setInput(mi.getName().getLocalPart(), mi);
+        pi = mi.addMessagePart("createResponse");
+        pi.setElementQName(RMConstants.getTerminateSequenceOperationName());
+        pi.setElement(true);
+        // pi.setXmlSchema(null);
+        unwrappedInput = new MessageInfo(oi, mi.getName());
+        unwrapped = new UnwrappedOperationInfo(oi);
+        oi.setUnwrappedOperation(unwrapped);
+        unwrapped.setInput(oi.getInputName(), unwrappedInput);
+        
+    }
+
+    void buildBindingInfo(ServiceInfo si) {
+        // use same binding id as for application endpoint
+        if (null != applicationEndpoint) {
+            String bindingId = applicationEndpoint.getEndpointInfo().getBinding().getBindingId();
+            SoapBindingInfo bi = new SoapBindingInfo(si, bindingId);
+            bi.setName(BINDING_NAME);
+            BindingOperationInfo boi = null;
+            SoapOperationInfo soi = null;
+
+            boi = bi.buildOperation(RMConstants.getCreateSequenceOperationName(), 
+                RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
+            soi = new SoapOperationInfo();
+            soi.setAction(RMConstants.getCreateSequenceAction());
+            boi.addExtensor(soi);
+            bi.addOperation(boi);
+            
+            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOperationName(), 
+                RMConstants.getCreateSequenceResponseOperationName().getLocalPart(), null);
+            soi = new SoapOperationInfo();
+            soi.setAction(RMConstants.getCreateSequenceResponseAction());
+            boi.addExtensor(soi);
+            bi.addOperation(boi);
+
+            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), 
+                RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
+            soi = new SoapOperationInfo();
+            soi.setAction(RMConstants.getTerminateSequenceAction());
+            boi.addExtensor(soi);
+            bi.addOperation(boi);
+
+            si.addBinding(bi);
+        }
+    }
+    
+    Identifier getOfferedIdentifier() {
+        return offeredIdentifier;    
+    }
     
+    void setOfferedIdentifier(OfferType offer) { 
+        if (offer != null) {
+            offeredIdentifier = offer.getIdentifier();
+        }
+    }
     
 }

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=auto&rev=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Wed Nov  1 09:36:06 2006
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.MAPAggregator;
+
+/**
+ * 
+ */
+public class RMInInterceptor extends AbstractRMInterceptor {
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(RMInInterceptor.class);
+    private Set<String> before = Collections.singleton(MAPAggregator.class.getName());
+    
+    public Set<String> getBefore() {
+        return before;
+    }
+
+    public Set<String> getAfter() {
+        return CastUtils.cast(Collections.EMPTY_SET);
+    }
+
+    public String getId() {
+        return RMInInterceptor.class.getName();
+    }
+    
+    void handleMessage(Message message, boolean isFault) throws SequenceFault {
+        LOG.entering(getClass().getName(), "handleMessage");
+        
+        RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false);
+        
+        final AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
+        assert null != maps;
+
+        String action = null;
+        if (null != maps.getAction()) {
+            action = maps.getAction().getValue();
+        }
+
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Action: " + action);
+        }
+        
+        // Destination destination = getManager().getDestination(message);
+        RMEndpoint rme = getManager().getReliableEndpoint(message);
+        Servant servant = new Servant(rme);
+        
+        if (RMConstants.getCreateSequenceResponseAction().equals(action)) {
+            servant.createSequenceResponse(message);
+            return;
+        } else if (RMConstants.getCreateSequenceAction().equals(action)) {
+            servant.createSequence(message);
+            /*
+            Runnable response = new Runnable() {
+                public void run() {
+                    try {
+                        getProxy().createSequenceResponse(maps, csr);
+                    } catch (IOException ex) {
+                        ex.printStackTrace();
+                    } catch (SequenceFault sf) {
+                        sf.printStackTrace();
+                    }
+                }
+            };
+            getBinding().getBus().getWorkQueueManager().getAutomaticWorkQueue().execute(response);
+            */    
+            return;
+        } else if (RMConstants.getTerminateSequenceAction().equals(action)) {
+            servant.terminateSequence(message);
+        }
+        
+        // for application AND out of band messages
+
+        if (null != rmps) {            
+            
+            processAcknowledgments(rmps);
+
+            processAcknowledgmentRequests(rmps);  
+            
+            processSequence(rmps, maps);
+            
+            processDeliveryAssurance(rmps);
+        }
+    }
+    
+    void processAcknowledgments(RMProperties rmps) {
+        
+    }
+
+    void processAcknowledgmentRequests(RMProperties rmps) {
+        
+    }
+    
+    void processSequence(RMProperties rmps, AddressingProperties maps) {
+        
+    }
+    
+    void processDeliveryAssurance(RMProperties rmps) {
+        
+    }
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=auto&rev=470005
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed Nov  1 09:36:06 2006
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.RelatesToType;
+import org.apache.cxf.ws.addressing.VersionTransformer;
+import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
+import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
+import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
+import org.apache.cxf.ws.rm.manager.RMManagerConfigBean;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.RMAssertion;
+import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
+
+/**
+ * 
+ */
+public class RMManager extends RMManagerConfigBean {
+
+    private Bus bus;
+    private RMStore store;
+    private RetransmissionQueue retransmissionQueue;
+    private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
+    private Timer timer = new Timer();
+
+    public Bus getBus() {
+        return bus;
+    }
+
+    @Resource
+    public void setBus(Bus b) {
+        bus = b;
+    }
+    
+    @PostConstruct
+    public void register() {
+        if (null != bus) {
+            bus.setExtension(this, RMManager.class);
+        }
+    }
+
+    public RMStore getStore() {
+        return store;
+    }
+
+    public void setStore(RMStore s) {
+        store = s;
+    }
+
+    public RetransmissionQueue getRetransmissionQueue() {
+        return retransmissionQueue;
+    }
+
+    public void setRetransmissionQueue(RetransmissionQueue rq) {
+        retransmissionQueue = rq;
+    }
+
+    public Timer getTimer() {
+        return timer;
+    }
+
+    public synchronized RMEndpoint getReliableEndpoint(Message message) {
+        Endpoint endpoint = RMContextUtils.getEndpoint(message);
+        RMEndpoint rme = reliableEndpoints.get(endpoint);
+        if (null == rme) {
+            rme = new RMEndpoint(this, endpoint);
+            rme.initialise();
+            reliableEndpoints.put(endpoint, rme);
+        }
+        return rme;
+    }
+
+    public Destination getDestination(Message message) {
+        RMEndpoint rme = getReliableEndpoint(message);
+        if (null != rme) {
+            return rme.getDestination();
+        }
+        return null;
+    }
+
+    public Source getSource(Message message) {
+        RMEndpoint rme = getReliableEndpoint(message);
+        if (null != rme) {
+            return rme.getSource();
+        }
+        return null;
+    }
+
+    public SourceSequence getSequence(Identifier inSeqId, Message message, AddressingProperties maps)
+        throws SequenceFault {
+
+        Source source = getSource(message);
+        SourceSequence seq = source.getCurrent(inSeqId);
+        if (null == seq) {
+            // TODO: better error handling
+            org.apache.cxf.ws.addressing.EndpointReferenceType to = null;
+            try {
+                EndpointReferenceType acksTo = null;
+                RelatesToType relatesTo = null;
+                if (RMContextUtils.isServerSide(message)) {
+
+                    AddressingPropertiesImpl inMaps = RMContextUtils.retrieveMAPs(message, false, false);
+                    inMaps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
+                    acksTo = RMUtils.createReference2004(inMaps.getTo().getValue());
+                    to = inMaps.getReplyTo();
+                    // getServant().setUnattachedIdentifier(inSeqId);
+                    relatesTo = (new org.apache.cxf.ws.addressing.ObjectFactory()).createRelatesToType();
+                    Destination destination = getDestination(message);
+                    DestinationSequence inSeq = destination.getSequence(inSeqId);
+                    relatesTo.setValue(inSeq != null ? inSeq.getCorrelationID() : null);
+
+                } else {
+                    acksTo = VersionTransformer.convert(maps.getReplyTo());
+                    // for oneways
+                    if (RMConstants.WSA_NONE_ADDRESS.equals(acksTo.getAddress().getValue())) {
+                        acksTo = RMUtils.createAnonymousReference2004();
+                    }
+                }
+
+                Proxy proxy = source.getReliableEndpoint().getProxy();
+                proxy.createSequence(to, acksTo, relatesTo);
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+
+            seq = source.awaitCurrent(inSeqId);
+            seq.setTarget(to);
+        }
+
+        return seq;
+    }
+
+    @PostConstruct
+    void initialise() {
+        if (!isSetRMAssertion()) {
+            org.apache.cxf.ws.rm.policy.ObjectFactory factory = 
+                new org.apache.cxf.ws.rm.policy.ObjectFactory();
+            RMAssertion rma = factory.createRMAssertion();
+            BaseRetransmissionInterval bri = factory.createRMAssertionBaseRetransmissionInterval();
+            bri.setMilliseconds(new BigInteger("3000"));
+            rma.setBaseRetransmissionInterval(bri);
+            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
+            setRMAssertion(rma);
+        }
+        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
+        if (!isSetDeliveryAssurance()) {
+            DeliveryAssuranceType da = factory.createDeliveryAssuranceType();
+            da.setAtLeastOnce(factory.createDeliveryAssuranceTypeAtLeastOnce());
+            setDeliveryAssurance(da);
+        }
+        if (!isSetSourcePolicy()) {
+            SourcePolicyType sp = factory.createSourcePolicyType();
+            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+            setSourcePolicy(sp);
+        }
+        if (!isSetDestinationPolicy()) {
+            DestinationPolicyType dp = factory.createDestinationPolicyType();
+            dp.setAcksPolicy(factory.createAcksPolicyType());
+            setDestinationPolicy(dp);
+        }
+    }
+
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message