cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andreasm...@apache.org
Subject svn commit: r463282 [1/2] - in /incubator/cxf/trunk: common/common/src/main/java/org/apache/cxf/jaxb/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence...
Date Thu, 12 Oct 2006 15:34:08 GMT
Author: andreasmyth
Date: Thu Oct 12 08:34:05 2006
New Revision: 463282

URL: http://svn.apache.org/viewvc?view=rev&rev=463282
Log:
[JIRA CXF-138] Abstractions for reliable messaging endpoints and sequences.

Added:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
      - copied, changed from r462824, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
      - copied, changed from r462804, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
      - copied, changed from r454719, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java
      - copied, changed from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java
      - copied, changed from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java
      - copied, changed from r462820, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java
      - copied, changed from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java
      - copied, changed from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImplTest.java
      - copied, changed from r462829, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/DestinationSequenceTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/impl/SourceSequenceImplTest.java
      - copied, changed from r462829, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/SourceSequenceTest.java
Removed:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMDestinationSequence.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMSourceSequence.java
Modified:
    incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java

Modified: incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java (original)
+++ incubator/cxf/trunk/common/common/src/main/java/org/apache/cxf/jaxb/DatatypeFactory.java Thu Oct 12 08:34:05 2006
@@ -33,8 +33,14 @@
  */
 public final class DatatypeFactory {
     
+    public static final Duration PT0S;
     private static final Logger LOG = LogUtils.getL7dLogger(DatatypeFactory.class);
-    
+   
+
+    static {
+        PT0S = createDuration("PT0S");
+    }
+
     /**
      * prevents instantiation
      *

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.InputStream;
+import java.math.BigInteger;
+
+import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
+
+public interface DestinationSequence {
+    
+    /**
+     * @return the sequence identifier
+     */
+    Identifier getIdentifier();
+    
+    /**
+     * @return the acksTo address for the sequence
+     */
+    EndpointReferenceType getAcksTo();
+    
+    /**
+     * @return the message number of the last message or null if the last message had not been received.
+     */
+    BigInteger getLastMessageNumber();
+    
+    /**
+     * @return the sequence acknowledgement presenting the sequences thus far received by a destination 
+     */
+    SequenceAcknowledgement getAcknowledgment();
+    
+    /**
+     * @return the sequence acknowledgement presenting the sequences thus far received by a destination
+     * as an input stream 
+     */
+    InputStream getAcknowledgmentAsStream();
+    
+    /**
+     * @return the identifier of the rm destination
+     */
+    String getEndpointIdentifier(); 
+    
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (from r462824, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java&r1=462824&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Thu Oct 12 08:34:05 2006
@@ -1,6 +1,25 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
 
-import javax.xml.ws.handler.MessageContext;
+import org.apache.cxf.message.Message;
 
 /**
  * Holder for utility methods relating to contexts.
@@ -11,18 +30,18 @@
     protected RMContextUtils() {
     }
     
-    public static RMProperties retrieveRMProperties(MessageContext context, boolean outbound) {
-        return (RMProperties)context.get(getRMPropertiesKey(outbound));
+    public static RMProperties retrieveRMProperties(Message message, boolean outbound) {
+        return (RMProperties)message.get(getRMPropertiesKey(outbound));
     }
     
-    public static void storeRMProperties(MessageContext context, RMProperties rmps, boolean outbound) {
+    public static void storeRMProperties(Message message, RMProperties rmps, boolean outbound) {
         String key = getRMPropertiesKey(outbound);
-        context.put(key, rmps);
-        context.setScope(key, MessageContext.Scope.HANDLER);
+        message.put(key, rmps);
     }
     
     private static String getRMPropertiesKey(boolean outbound) {
-        return outbound ? JAXWSRMConstants.RM_PROPERTIES_OUTBOUND : JAXWSRMConstants.RM_PROPERTIES_INBOUND;
+        return outbound ? RMMessageConstants.RM_PROPERTIES_OUTBOUND 
+            : RMMessageConstants.RM_PROPERTIES_INBOUND;
     }
     
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (from r462804, incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?view=diff&rev=463282&p1=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java&r1=462804&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/JAXWSRMConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Thu Oct 12 08:34:05 2006
@@ -20,25 +20,25 @@
 package org.apache.cxf.ws.rm;
 
 /**
- * A container for WS-RM constants.
+ * A container for WS-RM message constants.
  */
-public final class JAXWSRMConstants {
+public final class RMMessageConstants {
     
     /**
-     * Used to cache outbound RM properties in context.
+     * Used to cache outbound RM properties in message.
      */
     public static final String RM_PROPERTIES_OUTBOUND = 
-        "org.objectweb.celtix.ws.rm.context.outbound";
+        "org.apache.cxf.ws.rm.outbound";
     
     /**
-     * Used to cache inbound RM properties in context.
+     * Used to cache inbound RM properties in message.
      */
     public static final String RM_PROPERTIES_INBOUND = 
-        "org.objectweb.celtix.ws.rm.context.inbound";
+        "org.apache.cxf.ws.rm.inbound";
     
     /**
      * Prevents instantiation. 
      */
-    private JAXWSRMConstants() {
+    private RMMessageConstants() {
     }
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (from r454719, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java&r1=454719&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RetransmissionQueue.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu Oct 12 08:34:05 2006
@@ -1,9 +1,27 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
 
-import org.objectweb.celtix.context.ObjectMessageContext;
-import org.objectweb.celtix.workqueue.WorkQueue;
+import org.apache.cxf.message.Message;
 
 public interface RetransmissionQueue {
     
@@ -27,7 +45,7 @@
      * 
      * @param ctx the message context.
      */
-    void addUnacknowledged(ObjectMessageContext context);
+    void addUnacknowledged(Message message);
     
     /**
      * Purge all candidates for the given sequence that have been acknowledged.
@@ -38,10 +56,9 @@
     
     /**
      * Initiate resends.
-     * 
-     * @param queue the work queue providing async execution
+     *
      */
-    void start(WorkQueue wq);
+    void start();
     
     /**
      * Stops retransmission queue.

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+/**
+ * 
+ */
+
+public class SequenceFault extends Exception {
+    
+    private SequenceFaultType sequenceFault;
+
+    public SequenceFault(String message) {
+        super(message);
+    }
+
+    public SequenceFault(String message, SequenceFaultType sequenceFault) {
+        super(message);
+        this.sequenceFault = sequenceFault;
+    }
+
+    public SequenceFault(String message, SequenceFaultType sequenceFault, Throwable cause) {
+        super(message, cause);
+        this.sequenceFault = sequenceFault;
+    }
+
+    public SequenceFaultType getFaultInfo() {
+        return this.sequenceFault;
+    }
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SequenceFault.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.math.BigInteger;
+import java.util.Date;
+
+public interface SourceSequence {
+    
+    /**
+     * @return the sequence identifier
+     */
+    Identifier getIdentifier();
+    
+    /**
+     * @return the message number assigned to the most recent outgoing application message.
+     */
+    BigInteger getCurrentMessageNr();
+    
+    /**
+     * @return true if the last message had been sent for this sequence. 
+     */
+    boolean isLastMessage();
+    
+    /**
+     * @return the identifier of the sequence that was created on behalf of the CreateSequence request 
+     * that included this sequence as an offer
+     */
+    Identifier getOfferingSequenceIdentifier();
+    
+    /**
+     * @return the identifier of the rm source
+     */
+    String getEndpointIdentifier(); 
+    
+    /**
+     * @return the expiry data of this sequence
+     */
+    Date getExpiry();
+    
+    /**
+     * Returns true if a last message had been sent for this sequence and if all
+     * messages for this sequence have been acknowledged.
+     * 
+     * @return true if all messages have been acknowledged.
+     */
+    // boolean allAcknowledged();
+
+    /**
+     * Used by the RM source to cache received acknowledgements for this
+     * sequence.
+     * 
+     * @param acknowledgement an acknowledgement for this sequence
+     */
+    // void setAcknowledged(SequenceAcknowledgement acknowledgment);
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
+
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.ws.rm.Identifier;
+
+public class AbstractEndpoint {
+    
+    private final RMInterceptor interceptor;
+    private final Endpoint endpoint;
+    
+    protected AbstractEndpoint(RMInterceptor h, Endpoint e) {
+        interceptor = h;
+        endpoint = e;
+    }
+    
+    /**
+     * @return Returns the interceptor.
+     */
+    public RMInterceptor getInterceptor() {
+        return interceptor;
+    }
+    
+    /**
+     * @return Returns the endpoint.
+     */
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    /**
+     * Generates and returns a new sequence identifier.
+     * 
+     * @return the sequence identifier.
+     */
+    public Identifier generateSequenceIdentifier() {
+        String sequenceID = ContextUtils.generateUUID();
+        Identifier sid = RMUtils.getWSRMFactory().createIdentifier();
+        sid.setValue(sequenceID);        
+        return sid;
+    }
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractEndpoint.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/AbstractSequenceImpl.java Thu Oct 12 08:34:05 2006
@@ -29,7 +29,7 @@
 public abstract class AbstractSequenceImpl {
     
     protected final Identifier id;
-    protected SequenceAcknowledgement acked;
+    protected SequenceAcknowledgement acknowledgement;
     
     protected AbstractSequenceImpl(Identifier i) {
         id = i;
@@ -70,7 +70,7 @@
     }
     
     public synchronized boolean isAcknowledged(BigInteger m) {
-        for (AcknowledgementRange r : acked.getAcknowledgementRange()) {
+        for (AcknowledgementRange r : acknowledgement.getAcknowledgementRange()) {
             if (m.subtract(r.getLower()).signum() >= 0 && r.getUpper().subtract(m).signum() >= 0) {
                 return true;
             }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/ContextUtils.java Thu Oct 12 08:34:05 2006
@@ -19,10 +19,12 @@
 
 package org.apache.cxf.ws.rm.impl;
 
-import java.util.UUID;
 
-import org.apache.cxf.message.Exchange;
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.VersionTransformer;
 
 /**
  * Holder for utility methods relating to contexts.
@@ -30,11 +32,6 @@
 public final class ContextUtils {
 
     /**
-     * Used to fabricate a Uniform Resource Name from a UUID string
-     */
-    private static final String URN_UUID = "urn:uuid:";
-
-    /**
      * Prevents instantiation.
      */
     private ContextUtils() {
@@ -44,7 +41,7 @@
      * @return a generated UUID
      */
     static String generateUUID() {
-        return URN_UUID + UUID.randomUUID();
+        return org.apache.cxf.ws.addressing.ContextUtils.generateUUID();
     }
 
     /**
@@ -54,7 +51,62 @@
      * @return true iff the message direction is outbound
      */
     static boolean isOutbound(Message message) {
-        Exchange exchange = message.getExchange();
-        return message != null && exchange != null && message == exchange.getOutMessage();
+        return org.apache.cxf.ws.addressing.ContextUtils.isOutbound(message);
+    }
+
+    /**
+    * Determine if current messaging role is that of requestor.
+    *
+    * @param message the current Message
+    * @return true iff the current messaging role is that of requestor
+    */
+    public static boolean isRequestor(Message message) {
+        return org.apache.cxf.ws.addressing.ContextUtils.isRequestor(message);
+    }
+    
+    /**
+     * Retrieves the addressing properties from the current message.
+     * 
+     * @param message the current message
+     * @param isProviderContext true if the binding provider request context
+     * available to the client application as opposed to the message context
+     * visible to handlers
+     * @param isOutbound true iff the message is outbound
+     * @return the current addressing properties
+     */
+    public static AddressingProperties retrieveMAPs(
+                                                        Message message, 
+                                                        boolean isProviderContext,
+                                                        boolean isOutbound) {
+        return org.apache.cxf.ws.addressing.ContextUtils
+            .retrieveMAPs(message, isProviderContext, isOutbound);
+    }
+    
+    /**
+     * Ensures the appropriate version of WS-Addressing is used.
+     * 
+     * @param maps the addressing properties
+     */
+    public static void ensureExposedVersion(AddressingProperties maps) {
+        ((AddressingPropertiesImpl)maps)
+            .exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
+    }
+    
+    /**
+     * Returns the endpoint of this message, i.e. the client endpoint
+     * if the current messaging role is that of requestor, or the server
+     * endpoint otherwise.
+     * 
+     * @param message the current Message
+     * @return the endpoint
+     */
+    public static Endpoint getEndpoint(Message message) {
+        if (isRequestor(message)) {
+            return message.getExchange().get(Endpoint.class);
+        } else {
+            return null;
+        }
     }
+    
+    
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java (from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java&r1=454703&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMDestination.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Destination.java Thu Oct 12 08:34:05 2006
@@ -1,4 +1,23 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -6,36 +25,43 @@
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import org.objectweb.celtix.common.i18n.Message;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
-import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
 
-public class RMDestination extends RMEndpoint {
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SequenceType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+
 
-    private static final Logger LOG = LogUtils.getL7dLogger(RMDestination.class);
+public class Destination extends AbstractEndpoint {
+
+    private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
     
   
-    private Map<String, DestinationSequence> map;
+    private Map<String, DestinationSequenceImpl> map;
     
-    RMDestination(RMHandler h) {
-        super(h);
-        map = new HashMap<String, DestinationSequence>();    
+    Destination(RMInterceptor interceptor, Endpoint endpoint) {
+        super(interceptor, endpoint);
+        map = new HashMap<String, DestinationSequenceImpl>();    
     }
     
     public DestinationSequence getSequence(Identifier id) {        
         return map.get(id.getValue());
     }
     
-    public void addSequence(DestinationSequence seq) {
+    public void addSequence(DestinationSequenceImpl seq) {
         addSequence(seq, true);
     }
     
-    public void addSequence(DestinationSequence seq, boolean persist) {  
-        seq.setDestination(this);
+    public void addSequence(DestinationSequenceImpl seq, boolean persist) {  
+        // seq.setDestination(this);
         map.put(seq.getIdentifier().getValue(), seq);
         if (persist) {
-            RMStore store = getHandler().getStore();
+            RMStore store = getInterceptor().getStore();
             if (null != store) {
                 store.createDestinationSequence(seq);
             }
@@ -44,18 +70,16 @@
     
     public void removeSequence(DestinationSequence seq) {        
         map.remove(seq.getIdentifier().getValue());
-        RMStore store = getHandler().getStore();
+        RMStore store = getInterceptor().getStore();
         if (null != store) {
             store.removeDestinationSequence(seq.getIdentifier());
         }
     }
     
-    public Collection<DestinationSequence> getAllSequences() {        
-        return map.values();
+    public Collection<DestinationSequence> getAllSequences() {  
+        return CastUtils.cast(map.values());
     }
-
   
-    
    /**
     * Acknowledges receipt of a message. If the message is the last in the sequence, 
     * sends an out-of-band SequenceAcknowledgement unless there a response will be sent
@@ -68,7 +92,7 @@
     */
     public void acknowledge(SequenceType sequenceType, String replyToAddress) 
         throws SequenceFault {
-        DestinationSequence seq = getSequence(sequenceType.getIdentifier());
+        DestinationSequenceImpl seq = getSequenceImpl(sequenceType.getIdentifier());
         if (null != seq) {
             seq.acknowledge(sequenceType.getMessageNumber());
             
@@ -84,7 +108,7 @@
                 if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress)
                     || seq.canPiggybackAckOnPartialResponse())) {
                     try {
-                        getHandler().getProxy().acknowledge(seq);
+                        getInterceptor().getProxy().acknowledge(seq);
                     } catch (IOException ex) {
                         Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, seq);
                         LOG.log(Level.SEVERE, msg.toString(), ex);
@@ -92,7 +116,12 @@
                 }
             }
         } else {
-            throw DestinationSequence.createUnknownSequenceFault(sequenceType.getIdentifier());
+            SequenceFaultFactory sff = new SequenceFaultFactory();
+            throw sff.createUnknownSequenceFault(sequenceType.getIdentifier());
         }
+    }
+    
+    DestinationSequenceImpl getSequenceImpl(Identifier sid) {
+        return map.get(sid.getValue());
     }
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java (from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java&r1=454697&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/DestinationSequenceImpl.java Thu Oct 12 08:34:05 2006
@@ -1,4 +1,23 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -10,22 +29,25 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.objectweb.celtix.bus.configuration.wsrm.AcksPolicyType;
-import org.objectweb.celtix.bus.configuration.wsrm.DeliveryAssuranceType;
-import org.objectweb.celtix.common.i18n.Message;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
-import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
-import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
-import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
-import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
+import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SequenceFaultType;
+import org.apache.cxf.ws.rm.interceptor.AcksPolicyType;
+import org.apache.cxf.ws.rm.interceptor.DeliveryAssuranceType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.RMAssertion;
 
-public class DestinationSequence extends AbstractSequenceImpl implements RMDestinationSequence {
+public class DestinationSequenceImpl extends AbstractSequenceImpl implements DestinationSequence {
 
-    private static final Logger LOG = LogUtils.getL7dLogger(DestinationSequence.class);
+    private static final Logger LOG = LogUtils.getL7dLogger(DestinationSequenceImpl.class);
 
-    private RMDestination destination;
+    private Destination destination;
     private EndpointReferenceType acksTo;
     private BigInteger lastMessageNumber;
     private SequenceMonitor monitor;
@@ -33,20 +55,20 @@
     private List<DeferredAcknowledgment> deferredAcknowledgments;
     private String correlationID;
     
-    public DestinationSequence(Identifier i, EndpointReferenceType a, RMDestination d) {
+    public DestinationSequenceImpl(Identifier i, EndpointReferenceType a, Destination d) {
         this(i, a, null, null);
         setDestination(d);
     }
     
-    public DestinationSequence(Identifier i, EndpointReferenceType a,
+    public DestinationSequenceImpl(Identifier i, EndpointReferenceType a,
                               BigInteger lmn, SequenceAcknowledgement ac) {
         super(i);
         acksTo = a;
         lastMessageNumber = lmn;
-        acked = ac;
-        if (null == acked) {
-            acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
-            acked.setIdentifier(id);
+        acknowledgement = ac;
+        if (null == acknowledgement) {
+            acknowledgement = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
+            acknowledgement.setIdentifier(id);
         }
         monitor = new SequenceMonitor();
     }
@@ -54,79 +76,10 @@
     
     // RMDestinationSequence interface
     
-    
-    /**
-     * @return the acksTo address for the sequence
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.DestinationSequence#acknowledge(java.math.BigInteger)
      */
-    public EndpointReferenceType getAcksTo() {
-        return acksTo;
-    }
-    
-    /**
-     * @return the message number of the last message or null if the last message had not been received.
-     */
-    public BigInteger getLastMessageNr() {
-        return lastMessageNumber;
-    }
-    
-    /**
-     * @return the sequence acknowledgement presenting the sequences thus far received by a destination 
-     */
-    public SequenceAcknowledgement getAcknowledgment() {
-        return acked;
-    }
-    
-    /**
-     * @return the sequence acknowledgement presenting the sequences thus far received by a destination
-     * as an input stream 
-     */
-    public InputStream getAcknowledgmentAsStream() {
-        return RMUtils.getPersistenceUtils().getAcknowledgementAsInputStream(acked);
-    }
-    
-    /**
-     * @return the identifier of the rm destination
-     */
-    public String getEndpointIdentifier() {
-        if (null != destination) {
-            return destination.getHandler().getConfigurationHelper().getEndpointId();
-        }
-        return null;
-    }
-    
-    // end RMDestinationSequence interface
-    
-    final void setDestination(RMDestination d) {
-        destination = d;
-    }
-    
-    RMDestination getDestination() {
-        return destination;
-    }
-    
-    void setLastMessageNumber(BigInteger lmn) {
-        lastMessageNumber = lmn;
-    }    
-    
-    /**
-     * Returns the monitor for this sequence.
-     * 
-     * @return the sequence monitor.
-     */
-    SequenceMonitor getMonitor() {
-        return monitor;
-    }
-    
-
-    /**
-     * Called by the RM destination upon receipt of a message with the given
-     * message number for this sequence.
-     * 
-     * @param messageNumber the number of the received message
-     * @param lastMessage true if this is to be the last message in the sequence
-     */
-    void acknowledge(BigInteger messageNumber) throws SequenceFault {
-        
+    public void acknowledge(BigInteger messageNumber) throws SequenceFault {
         if (null != lastMessageNumber && messageNumber.compareTo(lastMessageNumber) > 0) {
             SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
             sf.setFaultCode(RMUtils.getRMConstants().getLastMessageNumberExceededFaultCode());
@@ -139,8 +92,8 @@
         synchronized (this) {
             boolean done = false;
             int i = 0;
-            for (; i < acked.getAcknowledgementRange().size(); i++) {
-                AcknowledgementRange r = acked.getAcknowledgementRange().get(i);
+            for (; i < acknowledgement.getAcknowledgementRange().size(); i++) {
+                AcknowledgementRange r = acknowledgement.getAcknowledgementRange().get(i);
                 if (r.getLower().compareTo(messageNumber) <= 0 
                     && r.getUpper().compareTo(messageNumber) >= 0) {
                     done = true;
@@ -166,7 +119,7 @@
                     .createSequenceAcknowledgementAcknowledgementRange();
                 range.setLower(messageNumber);
                 range.setUpper(messageNumber);
-                acked.getAcknowledgementRange().add(i, range);
+                acknowledgement.getAcknowledgementRange().add(i, range);
             }
             
             notifyAll();
@@ -175,6 +128,78 @@
         purgeAcknowledged(messageNumber);
         
         scheduleAcknowledgement();
+        
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.DestinationSequence#getAcknowledgment()
+     */
+    public SequenceAcknowledgement getAcknowledgment() {
+        return acknowledgement;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.DestinationSequence#getAcknowledgmentAsStream()
+     */
+    public InputStream getAcknowledgmentAsStream() {
+        // return RMUtils.getPersistenceUtils().getAcknowledgementAsInputStream(acked);
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.DestinationSequence#getAcksTo()
+     */
+    public EndpointReferenceType getAcksTo() {
+        return acksTo;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.DestinationSequence#getEndpointIdentifier()
+     */
+    public String getEndpointIdentifier() {
+        // TODO
+        /*
+        if (null != destination) {
+            return destination.getHandler().getConfigurationHelper().getEndpointId();
+        }
+        */
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.DestinationSequence#getLastMessageNr()
+     */
+    public BigInteger getLastMessageNumber() {
+        return lastMessageNumber;
+    }
+    
+    //  end RMDestinationSequence interface
+
+    void setLastMessageNumber(BigInteger lmn) {
+        lastMessageNumber = lmn;
+    }
+      
+    boolean canPiggybackAckOnPartialResponse() {
+        // TODO: should also check if we allow breaking the WI Profile rule by which no headers
+        // can be included in a HTTP response
+        return getAcksTo().getAddress().getValue().equals(Names.WSA_ANONYMOUS_ADDRESS);
+    }
+     
+    final void setDestination(Destination d) {
+        destination = d;
+    }
+    
+    Destination getDestination() {
+        return destination;
+    }
+    
+    /**
+     * Returns the monitor for this sequence.
+     * 
+     * @return the sequence monitor.
+     */
+    SequenceMonitor getMonitor() {
+        return monitor;
     }
     
     /**
@@ -188,7 +213,7 @@
      * @param s the SequenceType object including identifier and message number
      */
     boolean applyDeliveryAssurance(BigInteger mn) {
-        DeliveryAssuranceType da = destination.getHandler().getConfigurationHelper().getDeliveryAssurance();
+        DeliveryAssuranceType da = destination.getInterceptor().getDeliveryAssurance();
         if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
             Message msg = new Message("MESSAGE_ALREADY_DELIVERED", LOG, mn, getIdentifier().getValue());
             LOG.log(Level.SEVERE, msg.toString());
@@ -211,13 +236,13 @@
     }
     
     synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
-        return acked.getAcknowledgementRange().size() == 1
-            && acked.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
-            && acked.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+        return acknowledgement.getAcknowledgementRange().size() == 1
+            && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
+            && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
     }
     
     void purgeAcknowledged(BigInteger messageNr) {
-        RMStore store = destination.getHandler().getStore();
+        RMStore store = destination.getInterceptor().getStore();
         if (null == store) {
             return;
         }
@@ -250,26 +275,14 @@
         return correlationID;
     }
 
-    boolean canPiggybackAckOnPartialResponse() {
-        // TODO: should also check if we allow breaking the WI Profile rule by which no headers
-        // can be included in a HTTP response
-        return getAcksTo().getAddress().getValue().equals(Names.WSA_ANONYMOUS_ADDRESS);
-    }
-    
-    static SequenceFault createUnknownSequenceFault(Identifier sid) {
-        SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
-        sf.setFaultCode(RMUtils.getRMConstants().getUnknownSequenceFaultCode());
-        Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid.getValue());
-        return new SequenceFault(msg.toString(), sf);
-    }
-   
     private void scheduleAcknowledgement() {          
-        RMAssertionType rma = destination.getHandler().getConfigurationHelper().getRMAssertion();
+        RMAssertion rma = destination.getInterceptor().getRMAssertion();
         int delay = 0;
         if (null != rma.getAcknowledgementInterval()) {
             delay = rma.getAcknowledgementInterval().getMilliseconds().intValue();
         }
-        AcksPolicyType ap = destination.getHandler().getConfigurationHelper().getAcksPolicy();
+        AcksPolicyType ap = destination.getInterceptor().getDestinationPolicy().getAcksPolicy();
+ 
         if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
             scheduleDeferredAcknowledgement(delay);
         } else {
@@ -282,7 +295,7 @@
         acknowledgeOnNextOccasion = true;
     }
 
-    private synchronized void scheduleDeferredAcknowledgement(int delay) {
+    synchronized void scheduleDeferredAcknowledgement(int delay) {
         if (null == deferredAcknowledgments) {
             deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
         }
@@ -295,17 +308,17 @@
         }
         DeferredAcknowledgment da = new DeferredAcknowledgment();
         deferredAcknowledgments.add(da);
-        destination.getHandler().getTimer().schedule(da, delay);
+        destination.getInterceptor().getTimer().schedule(da, delay);
     }
 
     final class DeferredAcknowledgment extends TimerTask {
 
         public void run() {
-            DestinationSequence.this.scheduleImmediateAcknowledgement();
+            DestinationSequenceImpl.this.scheduleImmediateAcknowledgement();
             try {
-                destination.getHandler().getProxy().acknowledge(DestinationSequence.this);
+                destination.getInterceptor().getProxy().acknowledge(DestinationSequenceImpl.this);
             } catch (IOException ex) {
-                Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequence.this);
+                Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequenceImpl.this);
                 LOG.log(Level.SEVERE, msg.toString(), ex);
             }
         }

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
+
+import java.io.IOException;
+import org.apache.cxf.ws.rm.DestinationSequence;
+
+/**
+ * 
+ */
+public interface Proxy {
+
+    void acknowledge(DestinationSequence ds) throws IOException;
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Proxy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMInterceptor.java Thu Oct 12 08:34:05 2006
@@ -21,18 +21,26 @@
 
 import java.math.BigInteger;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.PostConstruct;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.phase.PhaseInterceptor;
+import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.MAPAggregator;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RetransmissionQueue;
 import org.apache.cxf.ws.rm.interceptor.DeliveryAssuranceType;
 import org.apache.cxf.ws.rm.interceptor.DestinationPolicyType;
 import org.apache.cxf.ws.rm.interceptor.RMInterceptorConfigBean;
@@ -53,7 +61,12 @@
     private static final Logger LOG = LogUtils.getL7dLogger(RMInterceptor.class);
       
     private RMStore store;
+    private RetransmissionQueue retransmissionQueue;
+    private Timer timer;
+    private Proxy proxy;
     private Set<String> after = Collections.singleton(MAPAggregator.class.getName());
+    private Map<Endpoint, Source> sources;
+    private Map<Endpoint, Destination> destinations;
     
     public RMStore getStore() {
         return store;
@@ -63,8 +76,25 @@
         this.store = store;
     }
     
-    // PhaseInterceptor interface
+    public RetransmissionQueue getRetransmissionQueue() {
+        return retransmissionQueue;
+    }
+
+    public void setRetransmissionQueue(RetransmissionQueue retransmissionQueue) {
+        this.retransmissionQueue = retransmissionQueue;
+    }
     
+    public Timer getTimer() {
+        return timer;
+    }
+    
+    public Proxy getProxy() {
+        return proxy;
+    }
+   
+    
+    // PhaseInterceptor interface
+
     public Set<String> getAfter() {
         return after;
     }
@@ -81,7 +111,6 @@
         return Phase.PRE_LOGICAL;
     }
 
-    
     public void handleMessage(Message msg) throws Fault {
         if (ContextUtils.isOutbound(msg)) {
             handleOutbound(msg, false);        
@@ -98,8 +127,53 @@
         }
     }
     
+    // rm logic
+    
     void handleOutbound(Message message, boolean isFault) {
         LOG.entering(getClass().getName(), "handleOutbound");
+       
+        AddressingProperties maps =
+            ContextUtils.retrieveMAPs(message, false, true);
+        ContextUtils.ensureExposedVersion(maps);
+
+        String action = null;
+        if (maps != null && null != maps.getAction()) {
+            action = maps.getAction().getValue();
+        }
+        
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Action: " + action);
+        }
+
+        boolean isApplicationMessage = isAplicationMessage(action);
+        
+        RMPropertiesImpl rmpsOut = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(message, true);
+        if (null == rmpsOut) {
+            rmpsOut = new RMPropertiesImpl();
+            RMContextUtils.storeRMProperties(message, rmpsOut, true);
+        }
+        
+        RMPropertiesImpl rmpsIn = null;
+        Identifier inSeqId = null;
+        BigInteger inMessageNumber = null;
+        
+        if (isApplicationMessage) {
+                        
+            rmpsIn = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(message, false);
+            
+            if (null != rmpsIn && null != rmpsIn.getSequence()) {
+                inSeqId = rmpsIn.getSequence().getIdentifier();
+                inMessageNumber = rmpsIn.getSequence().getMessageNumber();
+            }
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
+            }
+        }
+        
+        if (1 < 0) {
+            System.out.println(inMessageNumber);
+        }
+        
     }
     
     void handleInbound(Message message, boolean isFault) {
@@ -135,6 +209,44 @@
             dp.setAcksPolicy(factory.createAcksPolicyType());
             setDestinationPolicy(dp);
         }
+    }
+    
+    
+    synchronized Source getSource(Message message) {
+        Endpoint endpoint = ContextUtils.getEndpoint(message);
+        Source source = sources.get(endpoint);
+        if (null == source) {
+            source = new Source(this, endpoint);
+            sources.put(endpoint, source);
+        }
+        return source;
+    }
+    
+    synchronized Destination  getDestination(Message message) {
+        Endpoint endpoint = ContextUtils.getEndpoint(message);
+        Destination destination = destinations.get(endpoint);
+        if (null == destination) {
+            destination = new Destination(this, endpoint);
+            destinations.put(endpoint, destination);
+        }
+        return destination;
+    }
+    
+    synchronized Destination getDestination(Source source) {
+        return destinations.get(source.getEndpoint());
+    }
+   
+    
+    boolean isAplicationMessage(String action) {
+        if (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)
+            || RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)
+            || RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)
+            || RMUtils.getRMConstants().getLastMessageAction().equals(action)
+            || RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)
+            || RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) {
+            return false;
+        }
+        return true;
     }
 
 

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java (from r462820, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java&r1=462820&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMPropertiesImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/RMPropertiesImpl.java Thu Oct 12 08:34:05 2006
@@ -1,8 +1,34 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.cxf.ws.rm.AckRequestedType;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SequenceType;
+import org.apache.cxf.ws.rm.SourceSequence;
+
 public class RMPropertiesImpl implements RMProperties {
     private SequenceType sequence;
     private Collection<SequenceAcknowledgement> acks;
@@ -48,7 +74,8 @@
         }
         SequenceAcknowledgement ack = seq.getAcknowledgment();
         acks.add(ack);
-        seq.acknowledgmentSent();
+        // TODO: move to caller
+        // seq.acknowledgmentSent();
     }
   
 }

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java?view=auto&rev=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java Thu Oct 12 08:34:05 2006
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
+
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.i18n.Message;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SequenceFaultType;
+
+/**
+ * 
+ */
+
+public class SequenceFaultFactory { 
+
+    private static final Logger LOG = LogUtils.getL7dLogger(SequenceFaultFactory.class);
+
+    SequenceFault createUnknownSequenceFault(Identifier sid) {
+        SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
+        sf.setFaultCode(RMUtils.getRMConstants().getUnknownSequenceFaultCode());
+        Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid.getValue());
+        return new SequenceFault(msg.toString(), sf);
+    }    
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SequenceFaultFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java (from r454703, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java&r1=454703&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/RMSource.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/Source.java Thu Oct 12 08:34:05 2006
@@ -1,6 +1,24 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -8,29 +26,28 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import org.objectweb.celtix.common.i18n.Message;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMStore;
 
-public class RMSource extends RMEndpoint {
+public class Source extends AbstractEndpoint {
 
-    private static final Logger LOG = LogUtils.getL7dLogger(RMSource.class);
     private static final String REQUESTOR_SEQUENCE_ID = "";
     
-    private Map<String, SourceSequence> map;
-    private Map<String, SourceSequence> current;     
+    private Map<String, SourceSequenceImpl> map;
+    private Map<String, SourceSequenceImpl> current;     
     private Lock sequenceCreationLock;
     private Condition sequenceCreationCondition;
     private boolean sequenceCreationNotified;
 
-
-    RMSource(RMHandler h) {
-        super(h);
-        map = new HashMap<String, SourceSequence>();
-        current = new HashMap<String, SourceSequence>();
+    Source(RMInterceptor interceptor, Endpoint endpoint) {
+        super(interceptor, endpoint);
+        map = new HashMap<String, SourceSequenceImpl>();
+        current = new HashMap<String, SourceSequenceImpl>();
              
         sequenceCreationLock = new ReentrantLock();
         sequenceCreationCondition = sequenceCreationLock.newCondition();
@@ -40,15 +57,15 @@
         return map.get(id.getValue());
     }
     
-    public void addSequence(SourceSequence seq) { 
+    public void addSequence(SourceSequenceImpl seq) { 
         addSequence(seq, true);
     }
     
-    public void addSequence(SourceSequence seq, boolean persist) {
+    public void addSequence(SourceSequenceImpl seq, boolean persist) {
         seq.setSource(this);
         map.put(seq.getIdentifier().getValue(), seq);
         if (persist) {
-            RMStore store = getHandler().getStore();
+            RMStore store = getInterceptor().getStore();
             if (null != store) {
                 store.createSourceSequence(seq);
             }
@@ -57,18 +74,60 @@
     
     public void removeSequence(SourceSequence seq) {        
         map.remove(seq.getIdentifier().getValue());
-        RMStore store = getHandler().getStore();
+        RMStore store = getInterceptor().getStore();
         if (null != store) {
             store.removeSourceSequence(seq.getIdentifier());
         }
     }
     
-    public Collection<SourceSequence> getAllSequences() {         
-        return map.values();
-    }
+    public Collection<SourceSequence> getAllSequences() {                 
+        return CastUtils.cast(map.values());
+    } 
     
-
+    /**
+     * Stores the received acknowledgment in the Sequence object identified in
+     * the <code>SequenceAcknowldgement</code> parameter. Then purges any
+     * acknowledged messages from the retransmission queue and requests sequence
+     * termination if necessary.
+     * 
+     * @param acknowledgment
+     */
+    public void setAcknowledged(SequenceAcknowledgement acknowledgment) {
+        Identifier sid = acknowledgment.getIdentifier();
+        SourceSequenceImpl seq = getSequenceImpl(sid);        
+        if (null != seq) {
+            seq.setAcknowledged(acknowledgment);
+            getInterceptor().getRetransmissionQueue().purgeAcknowledged(seq);
+            if (seq.allAcknowledged()) {
+                // TODO
+                /*
+                try {
+                    // 
+                    getHandler().getProxy().terminateSequence(seq); 
+                } catch (IOException ex) {
+                    Message msg = new Message("SEQ_TERMINATION_FAILURE", LOG, seq.getIdentifier());
+                    LOG.log(Level.SEVERE, msg.toString(), ex);
+                }
+                */
+            }
+        }
+    }
     
+    /**
+     * Returns a collection of all sequences for which have not yet been
+     * completely acknowledged.
+     * 
+     * @return the collection of unacknowledged sequences.
+     */
+    public Collection<SourceSequence> getAllUnacknowledgedSequences() {
+        Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
+        for (SourceSequenceImpl seq : map.values()) {
+            if (!seq.allAcknowledged()) {
+                seqs.add(seq);
+            }
+        }
+        return seqs;        
+    }
 
     /**
      * Returns the current sequence used by a client side source.
@@ -83,7 +142,7 @@
      * Sets the current sequence used by a client side source.
      * @param s the current sequence.
      */
-    void setCurrent(SourceSequence s) {
+    void setCurrent(SourceSequenceImpl s) {
         setCurrent(null, s);
     }
     
@@ -144,7 +203,7 @@
      * sent as part of the inbound sequence with the specified identifier.
      * @param s the current sequence.
      */
-    void setCurrent(Identifier i, SourceSequence s) {        
+    void setCurrent(Identifier i, SourceSequenceImpl s) {        
         sequenceCreationLock.lock();
         try {
             current.put(i == null ? REQUESTOR_SEQUENCE_ID : i.getValue(), s);
@@ -154,45 +213,8 @@
             sequenceCreationLock.unlock();
         }
     }
-
-    /**
-     * Stores the received acknowledgment in the Sequence object identified in
-     * the <code>SequenceAcknowldgement</code> parameter. Then purges any
-     * acknowledged messages from the retransmission queue and requests sequence
-     * termination if necessary.
-     * 
-     * @param acknowledgment
-     */
-    public void setAcknowledged(SequenceAcknowledgement acknowledgment) {
-        Identifier sid = acknowledgment.getIdentifier();
-        SourceSequence seq = getSequence(sid);        
-        if (null != seq) {
-            seq.setAcknowledged(acknowledgment);
-            getHandler().getPersistenceManager().getQueue().purgeAcknowledged(seq);
-            if (seq.allAcknowledged()) {
-                try {
-                    getHandler().getProxy().terminateSequence(seq); 
-                } catch (IOException ex) {
-                    Message msg = new Message("SEQ_TERMINATION_FAILURE", LOG, seq.getIdentifier());
-                    LOG.log(Level.SEVERE, msg.toString(), ex);
-                }
-            }
-        }
-    }
     
-    /**
-     * Returns a collection of all sequences for which have not yet been
-     * completely acknowledged.
-     * 
-     * @return the collection of unacknowledged sequences.
-     */
-    public Collection<SourceSequence> getAllUnacknowledgedSequences() {
-        Collection<SourceSequence> seqs = new ArrayList<SourceSequence>();
-        for (SourceSequence seq : map.values()) {
-            if (!seq.allAcknowledged()) {
-                seqs.add(seq);
-            }
-        }
-        return seqs;        
+    SourceSequenceImpl getSequenceImpl(Identifier id) {        
+        return map.get(id.getValue());
     }
 }

Copied: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java (from r454697, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java?view=diff&rev=463282&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java&r1=454697&p2=incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java&r2=463282
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/main/java/org/objectweb/celtix/ws/rm/SourceSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/impl/SourceSequenceImpl.java Thu Oct 12 08:34:05 2006
@@ -1,53 +1,61 @@
-package org.objectweb.celtix.ws.rm;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.impl;
 
 import java.math.BigInteger;
 import java.util.Date;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
 import javax.xml.datatype.Duration;
 
-import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType;
-import org.objectweb.celtix.common.logging.LogUtils;
-import org.objectweb.celtix.ws.addressing.ContextUtils;
-import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
-import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.jaxb.DatatypeFactory;
+import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.rm.Expires;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.interceptor.SequenceTerminationPolicyType;
 
-public class SourceSequence extends AbstractSequenceImpl implements RMSourceSequence {
+public class SourceSequenceImpl extends AbstractSequenceImpl implements SourceSequence {
 
-    public static final Duration PT0S;
-    private static final Logger LOG = LogUtils.getL7dLogger(SourceSequence.class);
+    private static final Logger LOG = LogUtils.getL7dLogger(SourceSequenceImpl.class);
     
     private Date expires;
-    private RMSource source;
+    private Source source;
     private BigInteger currentMessageNumber;
     private boolean lastMessage;
     private Identifier offeringId;
-    private org.objectweb.celtix.ws.addressing.EndpointReferenceType target;
+    private org.apache.cxf.ws.addressing.EndpointReferenceType target;
     
-    static {
-        Duration pt0s = null;
-        try {
-            DatatypeFactory df = DatatypeFactory.newInstance();
-            pt0s = df.newDuration("PT0S");
-        } catch (DatatypeConfigurationException ex) {
-            LOG.log(Level.INFO, "Could not create Duration object.", ex);
-        }
-        PT0S = pt0s;
-    }
-    
-    public SourceSequence(Identifier i) {
+    public SourceSequenceImpl(Identifier i) {
         this(i, null, null);
     }
     
-    public SourceSequence(Identifier i, Date e, Identifier oi) {
+    public SourceSequenceImpl(Identifier i, Date e, Identifier oi) {
         this(i, e, oi, BigInteger.ZERO, false);
     }
-   
     
-    public SourceSequence(Identifier i, Date e, Identifier oi, BigInteger cmn, boolean lm) {
+    public SourceSequenceImpl(Identifier i, Date e, Identifier oi, BigInteger cmn, boolean lm) {
         super(i);
         expires = e;
 
@@ -55,41 +63,85 @@
 
         currentMessageNumber = cmn;
         lastMessage = lm;
-        acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
-        acked.setIdentifier(id);
+        acknowledgement = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
+        acknowledgement.setIdentifier(id);
     }
     
     // begin RMSourceSequence interface
-    
+      
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.SourceSequence#getCurrentMessageNr()
+     */
     public BigInteger getCurrentMessageNr() {
         return currentMessageNumber;
     }
-    
-    /**
-     * @return the identifier of the rm source
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.SourceSequence#getEndpointIdentifier()
      */
     public String getEndpointIdentifier() {
+        // TODO
+        /*
         if (null != source) {
             return source.getHandler().getConfigurationHelper().getEndpointId();
         }
+        */
         return null;
     }
-    
-    public Date getExpiry() {
-        return expires;
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.SourceSequence#getOfferingSequenceIdentifier()
+     */
+    public Identifier getOfferingSequenceIdentifier() {
+        return offeringId;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.SourceSequence#isLastMessage()
+     */
     public boolean isLastMessage() {
         return lastMessage;
     }
 
-    public Identifier getOfferingSequenceIdentifier() {
-        return offeringId;
+    /* (non-Javadoc)
+     * @see org.apache.cxf.ws.rm.SourceSequence#getExpiry()
+     */
+    public Date getExpiry() {
+        // TODO Auto-generated method stub
+        return null;
     }
     
     // end RMSourceSequence interface
     
-    void setSource(RMSource s) {
+    /**
+     * Returns true if a last message had been sent for this sequence and if all
+     * messages for this sequence have been acknowledged.
+     * 
+     * @return true if all messages have been acknowledged.
+     */
+    public boolean allAcknowledged() {
+        if (!lastMessage) {
+            return false;
+        }
+
+        if (acknowledgement.getAcknowledgementRange().size() == 1) {         
+            AcknowledgementRange r = acknowledgement.getAcknowledgementRange().get(0);
+            return r.getLower().equals(BigInteger.ONE) && r.getUpper().equals(currentMessageNumber);
+        }
+        return false;
+    }
+
+    /**
+     * Used by the RM source to cache received acknowledgements for this
+     * sequence.
+     * 
+     * @param acknowledgement an acknowledgement for this sequence
+     */
+    public void setAcknowledged(SequenceAcknowledgement a) {
+        acknowledgement = a;    
+    }
+     
+    void setSource(Source s) {
         source = s;
     }
     
@@ -117,15 +169,17 @@
 
     boolean isExpired() {
         return expires == null ? false : new Date().after(expires);
+        
     }
     
     void setExpires(Expires ex) {
         Duration d = null;
+        expires = null;
         if (null != ex) {
             d = ex.getValue();
         }
 
-        if (null != d && (null == PT0S || !PT0S.equals(d))) {
+        if (null != d && !d.equals(DatatypeFactory.PT0S)) {
             Date now = new Date();
             expires = new Date(now.getTime() + ex.getValue().getTimeInMillis(now));
         }
@@ -171,39 +225,15 @@
         }
     }
     
-    /**
-     * Used by the RM source to cache received acknowledgements for this
-     * sequence.
-     * 
-     * @param acknowledgement an acknowledgement for this sequence
-     */
-    void setAcknowledged(SequenceAcknowledgement acknowledgement) {        
-        acked = acknowledgement;      
-    }
-    
+
+   
     
     SequenceAcknowledgement getAcknowledgement() {
-        return acked;
+        return acknowledgement;
     }
     
    
-    /**
-     * Returns true if a last message had been sent for this sequence and if all
-     * messages for this sequence have been acknowledged.
-     * 
-     * @return true if all messages have been acknowledged.
-     */
-    boolean allAcknowledged() {
-        if (!lastMessage) {
-            return false;
-        }
-
-        if (acked.getAcknowledgementRange().size() == 1) {         
-            AcknowledgementRange r = acked.getAcknowledgementRange().get(0);
-            return r.getLower().equals(BigInteger.ONE) && r.getUpper().equals(currentMessageNumber);
-        }
-        return false;
-    }
+    
     
     /**
      * The target for the sequence is the first non-anonymous address that
@@ -215,13 +245,13 @@
      * 
      * @param to
      */
-    synchronized void setTarget(org.objectweb.celtix.ws.addressing.EndpointReferenceType to) {
+    synchronized void setTarget(org.apache.cxf.ws.addressing.EndpointReferenceType to) {
         if (target == null && !ContextUtils.isGenericAddress(to)) {
             target = to;
         }
     }
     
-    synchronized org.objectweb.celtix.ws.addressing.EndpointReferenceType getTarget() {
+    synchronized org.apache.cxf.ws.addressing.EndpointReferenceType getTarget() {
         return target;
     } 
    
@@ -230,29 +260,38 @@
      * and if so sets the lastMessageNumber property.
      */
     private void checkLastMessage(Identifier inSeqId, BigInteger inMsgNumber) { 
-
-        assert null != source;
         
         // check if this is a response to a message that was is the last message in the sequence
         // that included this sequence as an offer 
-        
+
         if (null != inSeqId && null != inMsgNumber) {
-            DestinationSequence inSeq = source.getHandler().getDestination().getSequence(inSeqId);
-            if (null != inSeq && offeredBy(inSeqId) && inMsgNumber.equals(inSeq.getLastMessageNr())) {
-                lastMessage = true;     
+            RMInterceptor interceptor = source.getInterceptor();
+            Destination destination = interceptor.getDestination(source);
+            DestinationSequenceImpl inSeq = null;
+            if (null != destination) {
+                inSeq = destination.getSequenceImpl(inSeqId);
+            }
+             
+            if (null != inSeq && offeredBy(inSeqId)
+                && inMsgNumber.equals(inSeq.getLastMessageNumber())) {
+                lastMessage = true;
             }
         } 
+
         
         if (!lastMessage) {
-            SequenceTerminationPolicyType stp = source.getHandler().getConfigurationHelper()
-                .getSequenceTerminationPolicy();
+            SequenceTerminationPolicyType stp = source.getInterceptor().getSourcePolicy()
+               .getSequenceTerminationPolicy();
+
             assert null != stp;
 
             if ((!stp.getMaxLength().equals(BigInteger.ZERO) && stp.getMaxLength()
                 .compareTo(currentMessageNumber) <= 0)
-                || (stp.getMaxRanges() > 0 && acked.getAcknowledgementRange().size() >= stp.getMaxRanges())
-                || (stp.getMaxUnacknowledged() > 0 && source.getHandler().getPersistenceManager().getQueue()
-                    .countUnacknowledged(this) >= stp.getMaxUnacknowledged())) {
+                || (stp.getMaxRanges() > 0
+                    && acknowledgement.getAcknowledgementRange().size() >= stp.getMaxRanges())
+                || (stp.getMaxUnacknowledged() > 0
+                    && source.getInterceptor().getRetransmissionQueue()
+                        .countUnacknowledged(this) >= stp.getMaxUnacknowledged())) {
                 lastMessage = true;
             }
         }
@@ -261,5 +300,4 @@
             LOG.fine(currentMessageNumber + " should be the last message in this sequence.");
         }
     }
-
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java?view=diff&rev=463282&r1=463281&r2=463282
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMStore.java Thu Oct 12 08:34:05 2006
@@ -23,7 +23,9 @@
 import java.util.Collection;
 import java.util.Map;
 
+import org.apache.cxf.ws.rm.DestinationSequence;
 import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.SourceSequence;
 
 
 public interface RMStore {
@@ -39,14 +41,14 @@
      * <code>RMSourceSequence</code> object.
      * @param seq the sequence
      */
-    void createSourceSequence(RMSourceSequence seq);
+    void createSourceSequence(SourceSequence seq);
     
     /**
      * Create a destination sequence in the persistent store, with the sequence attributes as specified in the
      * <code>RMSDestinationSequence</code> object.
      * @param seq the sequence
      */
-    void createDestinationSequence(RMDestinationSequence seq);
+    void createDestinationSequence(DestinationSequence seq);
     
     /**
      * Remove the source sequence with the specified identifier from persistent store. 
@@ -67,7 +69,7 @@
      * @param endpointIdentifier the identifier for the source
      * @return the collection of sequences
      */    
-    Collection<RMSourceSequence> getSourceSequences(String endpointIdentifier);
+    Collection<SourceSequence> getSourceSequences(String endpointIdentifier);
     
     /**
      * Retrieves all sequences managed by the identified RM destination endpoint 
@@ -76,7 +78,7 @@
      * @param endpointIdentifier the identifier for the destination
      * @return the collection of sequences
      */    
-    Collection<RMDestinationSequence> getDestinationSequences(String endpointIdentifier);
+    Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier);
     
     /**
      * Retrieves the outbound/inbound messages stored for the source/destination sequence with 
@@ -95,7 +97,7 @@
      * @param seq the source sequence 
      * @param msg the outgoing message
      */
-    void persistOutgoing(RMSourceSequence seq, RMMessage msg);
+    void persistOutgoing(SourceSequence seq, RMMessage msg);
     
    /**
     * Called by an RM source upon processing an outbound message. The <code>RMMessage</code>
@@ -104,7 +106,7 @@
     * @param seq the destination sequence
     * @param msg the incoming message
     */
-    void persistIncoming(RMDestinationSequence seq, RMMessage msg);
+    void persistIncoming(DestinationSequence seq, RMMessage msg);
   
     /**
      * Removes the messages with the given message numbers and identifiers from the store of



Mime
View raw message