Repository: cxf Updated Branches: refs/heads/master a9c8de0d5 -> cb36642a8 [CXF-6784]WS-Notification subscription should support renew Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/cb36642a Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/cb36642a Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/cb36642a Branch: refs/heads/master Commit: cb36642a838a145c993ed7bf9b5bf1d60ee235a8 Parents: a9c8de0 Author: Freeman Fang Authored: Wed Mar 2 14:16:49 2016 +0800 Committer: Freeman Fang Committed: Wed Mar 2 14:16:49 2016 +0800 ---------------------------------------------------------------------- .../cxf/wsn/client/NotificationBroker.java | 15 ++++-- .../apache/cxf/wsn/AbstractSubscription.java | 8 +-- .../org/apache/cxf/wsn/jms/JmsSubscription.java | 53 ++++++++++++++++++-- .../java/org/apache/cxf/wsn/WsnBrokerTest.java | 33 ++++++++++++ 4 files changed, 97 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java index 1ed7b36..a532f3a 100644 --- a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java +++ b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java @@ -68,6 +68,8 @@ public class NotificationBroker implements Referencable { public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression"); public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent"); + + public static final QName QNAME_INITIAL_TERMINATION_TIME = new QName(WSN_URI, "InitialTerminationTime"); private org.oasis_open.docs.wsn.brw_2.NotificationBroker broker; @@ -159,8 +161,10 @@ public class NotificationBroker implements Referencable { NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault { //CHECKSTYLE:ON - return subscribe(consumer, topic, null, false); + return subscribe(consumer, topic, null, false, null); } + + public Subscription subscribe(Referencable consumer, String topic, String xpath) //CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults @@ -170,11 +174,11 @@ public class NotificationBroker implements Referencable { UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault { //CHECKSTYLE:ON - return subscribe(consumer, topic, xpath, false); + return subscribe(consumer, topic, xpath, false, null); } public Subscription subscribe(Referencable consumer, String topic, - String xpath, boolean raw) + String xpath, boolean raw, String initialTerminationTime) //CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults throws TopicNotSupportedFault, InvalidFilterFault, TopicExpressionDialectUnknownFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, @@ -184,6 +188,11 @@ public class NotificationBroker implements Referencable { //CHECKSTYLE:ON Subscribe subscribeRequest = new Subscribe(); + if (initialTerminationTime != null) { + subscribeRequest.setInitialTerminationTime( + new JAXBElement(QNAME_INITIAL_TERMINATION_TIME, + String.class, initialTerminationTime)); + } subscribeRequest.setConsumerReference(consumer.getEpr()); subscribeRequest.setFilter(new FilterType()); if (topic != null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java index 48bf45c..6036c86 100644 --- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java +++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java @@ -126,6 +126,7 @@ public abstract class AbstractSubscription extends AbstractEndpoint implements P Renew renewRequest) throws ResourceUnknownFault, UnacceptableTerminationTimeFault { XMLGregorianCalendar time = validateTerminationTime(renewRequest.getTerminationTime()); + this.setTerminationTime(time); renew(time); RenewResponse response = new RenewResponse(); response.setTerminationTime(time); @@ -401,12 +402,7 @@ public abstract class AbstractSubscription extends AbstractEndpoint implements P throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '" + contentFilter.getDialect() + "'", fault); } - if (terminationTime != null) { - UnacceptableInitialTerminationTimeFaultType fault - = new UnacceptableInitialTerminationTimeFaultType(); - throw new UnacceptableInitialTerminationTimeFault("InitialTerminationTime is not supported", - fault); - } + } public AbstractNotificationBroker getBroker() { http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java index 6420400..56f5ed2 100644 --- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java +++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java @@ -34,6 +34,7 @@ import javax.jms.Topic; import javax.xml.XMLConstants; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; +import javax.xml.datatype.DatatypeConstants; import javax.xml.datatype.XMLGregorianCalendar; import javax.xml.stream.XMLStreamReader; import javax.xml.xpath.XPath; @@ -57,7 +58,6 @@ import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType; import org.oasis_open.docs.wsn.b_2.Subscribe; import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType; import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType; -import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType; import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault; import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault; import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault; @@ -86,6 +86,12 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me private Topic jmsTopic; private JAXBContext jaxbContext; + + private boolean checkTermination = true; + + private boolean isSessionActive = true; + + private Thread terminationThread; public JmsSubscription(String name) { super(name); @@ -102,6 +108,12 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(jmsTopic); consumer.setMessageListener(this); + checkTermination = true; + isSessionActive = true; + if (getTerminationTime() != null) { + terminationThread = new TerminationThread(); + terminationThread.start(); + } } catch (JMSException e) { SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType(); throw new SubscribeCreationFailedFault("Error starting subscription", fault, e); @@ -134,6 +146,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me } else { try { session.close(); + isSessionActive = false; } catch (JMSException e) { PauseFailedFaultType fault = new PauseFailedFaultType(); throw new PauseFailedFault("Error pausing subscription", fault, e); @@ -153,6 +166,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(jmsTopic); consumer.setMessageListener(this); + isSessionActive = true; } catch (JMSException e) { ResumeFailedFaultType fault = new ResumeFailedFaultType(); throw new ResumeFailedFault("Error resuming subscription", fault, e); @@ -162,8 +176,15 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me @Override protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault { - UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType(); - throw new UnacceptableTerminationTimeFault("TerminationTime is not supported", fault); + try { + this.resume(); + if (this.terminationThread == null) { + terminationThread = new TerminationThread(); + terminationThread.start(); + } + } catch (ResumeFailedFault e) { + LOGGER.log(Level.WARNING, "renew failed", e); + } } @Override @@ -172,6 +193,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me if (session != null) { try { session.close(); + checkTermination = false; } catch (JMSException e) { UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType(); throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e); @@ -245,5 +267,30 @@ public abstract class JmsSubscription extends AbstractSubscription implements Me } protected abstract void doNotify(Notify notify); + + class TerminationThread extends Thread { + public void run() { + while (checkTermination) { + XMLGregorianCalendar tt = getTerminationTime(); + if (tt != null && isSessionActive) { + XMLGregorianCalendar ct = getCurrentTime(); + int c = tt.compare(ct); + if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) { + LOGGER.log(Level.INFO, "Need Pause this subscribe"); + try { + pause(); + } catch (PauseFailedFault e) { + LOGGER.log(Level.WARNING, "Pause failed", e); + } + } + } + try { + Thread.sleep(10000); // check if should terminate every 10 sec + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "TerminationThread sleep interrupted", e); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/cb36642a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java ---------------------------------------------------------------------- diff --git a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java index e8f388d..b81e70c 100644 --- a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java +++ b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java @@ -130,6 +130,7 @@ public abstract class WsnBrokerTest extends Assert { Consumer consumer = new Consumer(callback, "http://localhost:" + port2 + "/test/consumer"); Subscription subscription = notificationBroker.subscribe(consumer, "myTopic"); + synchronized (callback.notifications) { notificationBroker.notify("myTopic", @@ -145,6 +146,38 @@ public abstract class WsnBrokerTest extends Assert { subscription.unsubscribe(); consumer.stop(); } + + @Test + public void testRenew() throws Exception { + TestConsumer callback = new TestConsumer(); + Consumer consumer = new Consumer(callback, "http://localhost:" + port2 + "/test/consumer"); + + //create subscription with InitialTerminationTime 20 sec, so that the + //subscription would be expired after 20 sec + Subscription subscription = notificationBroker.subscribe(consumer, "myTopic", null, false, "PT20S"); + Thread.sleep(30000); + synchronized (callback.notifications) { + notificationBroker.notify("myTopic", + new JAXBElement(new QName("urn:test:org", "foo"), + String.class, "bar")); + callback.notifications.wait(10000); + } + assertEquals(0, callback.notifications.size()); //the subscription is expired so can't get the notification + subscription.renew("PT60S"); //renew another 60 sec to resend the notification + synchronized (callback.notifications) { + notificationBroker.notify("myTopic", + new JAXBElement(new QName("urn:test:org", "foo"), + String.class, "bar")); + callback.notifications.wait(10000); + } + assertEquals(1, callback.notifications.size()); //the subscription is expired so can't get the notification + NotificationMessageHolderType message = callback.notifications.get(0); + assertEquals(WSNHelper.getInstance().getWSAAddress(subscription.getEpr()), + WSNHelper.getInstance().getWSAAddress(message.getSubscriptionReference())); + + subscription.unsubscribe(); + consumer.stop(); + } @Test public void testPullPoint() throws Exception {