cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ff...@apache.org
Subject cxf git commit: [CXF-6784]WS-Notification subscription should support renew
Date Wed, 02 Mar 2016 06:17:15 GMT
Repository: cxf
Updated Branches:
  refs/heads/master a9c8de0d5 -> cb36642a8


[CXF-6784]WS-Notification subscription should support renew


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/cb36642a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/cb36642a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/cb36642a

Branch: refs/heads/master
Commit: cb36642a838a145c993ed7bf9b5bf1d60ee235a8
Parents: a9c8de0
Author: Freeman Fang <freeman.fang@gmail.com>
Authored: Wed Mar 2 14:16:49 2016 +0800
Committer: Freeman Fang <freeman.fang@gmail.com>
Committed: Wed Mar 2 14:16:49 2016 +0800

----------------------------------------------------------------------
 .../cxf/wsn/client/NotificationBroker.java      | 15 ++++--
 .../apache/cxf/wsn/AbstractSubscription.java    |  8 +--
 .../org/apache/cxf/wsn/jms/JmsSubscription.java | 53 ++++++++++++++++++--
 .../java/org/apache/cxf/wsn/WsnBrokerTest.java  | 33 ++++++++++++
 4 files changed, 97 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
index 1ed7b36..a532f3a 100644
--- a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
+++ b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
@@ -68,6 +68,8 @@ public class NotificationBroker implements Referencable {
     public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
 
     public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
+    
+    public static final QName QNAME_INITIAL_TERMINATION_TIME = new QName(WSN_URI, "InitialTerminationTime");
 
     
     private org.oasis_open.docs.wsn.brw_2.NotificationBroker broker;
@@ -159,8 +161,10 @@ public class NotificationBroker implements Referencable {
         NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
         //CHECKSTYLE:ON
         
-        return subscribe(consumer, topic, null, false);
+        return subscribe(consumer, topic, null, false, null);
     }
+    
+
 
     public Subscription subscribe(Referencable consumer, String topic, String xpath) 
         //CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults
@@ -170,11 +174,11 @@ public class NotificationBroker implements Referencable {
         UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault,

         InvalidProducerPropertiesExpressionFault {
         //CHECKSTYLE:ON
-        return subscribe(consumer, topic, xpath, false);
+        return subscribe(consumer, topic, xpath, false, null);
     }
 
     public Subscription subscribe(Referencable consumer, String topic,
-                                  String xpath, boolean raw)
+                                  String xpath, boolean raw, String initialTerminationTime)
         //CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults
         throws TopicNotSupportedFault, InvalidFilterFault, TopicExpressionDialectUnknownFault,

         UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, 
@@ -184,6 +188,11 @@ public class NotificationBroker implements Referencable {
         //CHECKSTYLE:ON
 
         Subscribe subscribeRequest = new Subscribe();
+        if (initialTerminationTime != null) {
+            subscribeRequest.setInitialTerminationTime(
+                  new JAXBElement<String>(QNAME_INITIAL_TERMINATION_TIME,
+                  String.class, initialTerminationTime));
+        }
         subscribeRequest.setConsumerReference(consumer.getEpr());
         subscribeRequest.setFilter(new FilterType());
         if (topic != null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
index 48bf45c..6036c86 100644
--- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
+++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
@@ -126,6 +126,7 @@ public abstract class AbstractSubscription extends AbstractEndpoint implements
P
             Renew renewRequest) throws ResourceUnknownFault, UnacceptableTerminationTimeFault
{
 
         XMLGregorianCalendar time = validateTerminationTime(renewRequest.getTerminationTime());
+        this.setTerminationTime(time);
         renew(time);
         RenewResponse response = new RenewResponse();
         response.setTerminationTime(time);
@@ -401,12 +402,7 @@ public abstract class AbstractSubscription extends AbstractEndpoint implements
P
             throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect:
'"
                     + contentFilter.getDialect() + "'", fault);
         }
-        if (terminationTime != null) {
-            UnacceptableInitialTerminationTimeFaultType fault 
-                = new UnacceptableInitialTerminationTimeFaultType();
-            throw new UnacceptableInitialTerminationTimeFault("InitialTerminationTime is
not supported", 
-                                                              fault);
-        }
+        
     }
 
     public AbstractNotificationBroker getBroker() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
index 6420400..56f5ed2 100644
--- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
+++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
@@ -34,6 +34,7 @@ import javax.jms.Topic;
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConstants;
 import javax.xml.datatype.XMLGregorianCalendar;
 import javax.xml.stream.XMLStreamReader;
 import javax.xml.xpath.XPath;
@@ -57,7 +58,6 @@ import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
 import org.oasis_open.docs.wsn.b_2.Subscribe;
 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
 import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
 import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
 import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
 import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
@@ -86,6 +86,12 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
     private Topic jmsTopic;
 
     private JAXBContext jaxbContext;
+    
+    private boolean checkTermination = true;
+    
+    private boolean isSessionActive = true;
+    
+    private Thread terminationThread;
 
     public JmsSubscription(String name) {
         super(name);
@@ -102,6 +108,12 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageConsumer consumer = session.createConsumer(jmsTopic);
             consumer.setMessageListener(this);
+            checkTermination = true;
+            isSessionActive = true;
+            if (getTerminationTime() != null) {
+                terminationThread = new TerminationThread();
+                terminationThread.start();
+            }
         } catch (JMSException e) {
             SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
             throw new SubscribeCreationFailedFault("Error starting subscription", fault,
e);
@@ -134,6 +146,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
         } else {
             try {
                 session.close();
+                isSessionActive = false;
             } catch (JMSException e) {
                 PauseFailedFaultType fault = new PauseFailedFaultType();
                 throw new PauseFailedFault("Error pausing subscription", fault, e);
@@ -153,6 +166,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 MessageConsumer consumer = session.createConsumer(jmsTopic);
                 consumer.setMessageListener(this);
+                isSessionActive = true;
             } catch (JMSException e) {
                 ResumeFailedFaultType fault = new ResumeFailedFaultType();
                 throw new ResumeFailedFault("Error resuming subscription", fault, e);
@@ -162,8 +176,15 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
 
     @Override
     protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault
{
-        UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
-        throw new UnacceptableTerminationTimeFault("TerminationTime is not supported", fault);
+        try {
+            this.resume();
+            if (this.terminationThread == null) {
+                terminationThread = new TerminationThread();
+                terminationThread.start();
+            }
+        } catch (ResumeFailedFault e) {
+            LOGGER.log(Level.WARNING, "renew failed", e);
+        }
     }
 
     @Override
@@ -172,6 +193,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
         if (session != null) {
             try {
                 session.close();
+                checkTermination = false;
             } catch (JMSException e) {
                 UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
                 throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault,
e);
@@ -245,5 +267,30 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
     }
 
     protected abstract void doNotify(Notify notify);
+    
+    class TerminationThread extends Thread {
+        public void run() {
+            while (checkTermination) {
+                XMLGregorianCalendar tt = getTerminationTime();
+                if (tt != null && isSessionActive) {
+                    XMLGregorianCalendar ct = getCurrentTime();
+                    int c = tt.compare(ct);
+                    if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+                        LOGGER.log(Level.INFO, "Need Pause this subscribe");
+                        try {
+                            pause();
+                        } catch (PauseFailedFault e) {
+                            LOGGER.log(Level.WARNING, "Pause failed", e);
+                        }
+                    }
+                }
+                try {
+                    Thread.sleep(10000); // check if should terminate every 10 sec
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARNING, "TerminationThread sleep interrupted", e);
+                }
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
index e8f388d..b81e70c 100644
--- a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
+++ b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
@@ -130,6 +130,7 @@ public abstract class WsnBrokerTest extends Assert {
         Consumer consumer = new Consumer(callback, "http://localhost:" + port2 + "/test/consumer");
 
         Subscription subscription = notificationBroker.subscribe(consumer, "myTopic");
+        
 
         synchronized (callback.notifications) {
             notificationBroker.notify("myTopic", 
@@ -145,6 +146,38 @@ public abstract class WsnBrokerTest extends Assert {
         subscription.unsubscribe();
         consumer.stop();
     }
+    
+    @Test
+    public void testRenew() throws Exception {
+        TestConsumer callback = new TestConsumer();
+        Consumer consumer = new Consumer(callback, "http://localhost:" + port2 + "/test/consumer");
+
+        //create subscription with InitialTerminationTime 20 sec, so that the 
+        //subscription would be expired after 20 sec
+        Subscription subscription = notificationBroker.subscribe(consumer, "myTopic", null,
false, "PT20S");
+        Thread.sleep(30000);
+        synchronized (callback.notifications) {
+            notificationBroker.notify("myTopic", 
+                                      new JAXBElement<String>(new QName("urn:test:org",
"foo"),
+                                          String.class, "bar"));
+            callback.notifications.wait(10000);
+        }
+        assertEquals(0, callback.notifications.size()); //the subscription is expired so
can't get the notification
+        subscription.renew("PT60S"); //renew another 60 sec to resend the notification
+        synchronized (callback.notifications) {
+            notificationBroker.notify("myTopic", 
+                                      new JAXBElement<String>(new QName("urn:test:org",
"foo"),
+                                          String.class, "bar"));
+            callback.notifications.wait(10000);
+        }
+        assertEquals(1, callback.notifications.size()); //the subscription is expired so
can't get the notification
+        NotificationMessageHolderType message = callback.notifications.get(0);
+        assertEquals(WSNHelper.getInstance().getWSAAddress(subscription.getEpr()), 
+                     WSNHelper.getInstance().getWSAAddress(message.getSubscriptionReference()));
+
+        subscription.unsubscribe();
+        consumer.stop();
+    }
 
     @Test
     public void testPullPoint() throws Exception {


Mime
View raw message