Repository: cxf
Updated Branches:
refs/heads/3.1.x-fixes 8b496fceb -> 15e4db968
[CXF-6784]WS-Notification subscription should support renew
(cherry picked from commit cb36642a838a145c993ed7bf9b5bf1d60ee235a8)
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/15e4db96
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/15e4db96
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/15e4db96
Branch: refs/heads/3.1.x-fixes
Commit: 15e4db9688617af6764f82c234e5a9582692a6e2
Parents: 8b496fc
Author: Freeman Fang <freeman.fang@gmail.com>
Authored: Wed Mar 2 14:16:49 2016 +0800
Committer: Freeman Fang <freeman.fang@gmail.com>
Committed: Wed Mar 2 14:17:37 2016 +0800
----------------------------------------------------------------------
.../cxf/wsn/client/NotificationBroker.java | 15 ++++--
.../apache/cxf/wsn/AbstractSubscription.java | 8 +--
.../org/apache/cxf/wsn/jms/JmsSubscription.java | 53 ++++++++++++++++++--
.../java/org/apache/cxf/wsn/WsnBrokerTest.java | 33 ++++++++++++
4 files changed, 97 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/15e4db96/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
index 1ed7b36..a532f3a 100644
--- a/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
+++ b/services/wsn/wsn-api/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
@@ -68,6 +68,8 @@ public class NotificationBroker implements Referencable {
public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
+
+ public static final QName QNAME_INITIAL_TERMINATION_TIME = new QName(WSN_URI, "InitialTerminationTime");
private org.oasis_open.docs.wsn.brw_2.NotificationBroker broker;
@@ -159,8 +161,10 @@ public class NotificationBroker implements Referencable {
NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
//CHECKSTYLE:ON
- return subscribe(consumer, topic, null, false);
+ return subscribe(consumer, topic, null, false, null);
}
+
+
public Subscription subscribe(Referencable consumer, String topic, String xpath)
//CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults
@@ -170,11 +174,11 @@ public class NotificationBroker implements Referencable {
UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault,
InvalidProducerPropertiesExpressionFault {
//CHECKSTYLE:ON
- return subscribe(consumer, topic, xpath, false);
+ return subscribe(consumer, topic, xpath, false, null);
}
public Subscription subscribe(Referencable consumer, String topic,
- String xpath, boolean raw)
+ String xpath, boolean raw, String initialTerminationTime)
//CHECKSTYLE:OFF - WS-Notification spec throws a lot of faults
throws TopicNotSupportedFault, InvalidFilterFault, TopicExpressionDialectUnknownFault,
UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault,
@@ -184,6 +188,11 @@ public class NotificationBroker implements Referencable {
//CHECKSTYLE:ON
Subscribe subscribeRequest = new Subscribe();
+ if (initialTerminationTime != null) {
+ subscribeRequest.setInitialTerminationTime(
+ new JAXBElement<String>(QNAME_INITIAL_TERMINATION_TIME,
+ String.class, initialTerminationTime));
+ }
subscribeRequest.setConsumerReference(consumer.getEpr());
subscribeRequest.setFilter(new FilterType());
if (topic != null) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/15e4db96/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
index 48bf45c..6036c86 100644
--- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
+++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
@@ -126,6 +126,7 @@ public abstract class AbstractSubscription extends AbstractEndpoint implements
P
Renew renewRequest) throws ResourceUnknownFault, UnacceptableTerminationTimeFault
{
XMLGregorianCalendar time = validateTerminationTime(renewRequest.getTerminationTime());
+ this.setTerminationTime(time);
renew(time);
RenewResponse response = new RenewResponse();
response.setTerminationTime(time);
@@ -401,12 +402,7 @@ public abstract class AbstractSubscription extends AbstractEndpoint implements
P
throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect:
'"
+ contentFilter.getDialect() + "'", fault);
}
- if (terminationTime != null) {
- UnacceptableInitialTerminationTimeFaultType fault
- = new UnacceptableInitialTerminationTimeFaultType();
- throw new UnacceptableInitialTerminationTimeFault("InitialTerminationTime is
not supported",
- fault);
- }
+
}
public AbstractNotificationBroker getBroker() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/15e4db96/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
index 6420400..56f5ed2 100644
--- a/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
+++ b/services/wsn/wsn-core/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
@@ -34,6 +34,7 @@ import javax.jms.Topic;
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConstants;
import javax.xml.datatype.XMLGregorianCalendar;
import javax.xml.stream.XMLStreamReader;
import javax.xml.xpath.XPath;
@@ -57,7 +58,6 @@ import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
import org.oasis_open.docs.wsn.b_2.Subscribe;
import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
@@ -86,6 +86,12 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
private Topic jmsTopic;
private JAXBContext jaxbContext;
+
+ private boolean checkTermination = true;
+
+ private boolean isSessionActive = true;
+
+ private Thread terminationThread;
public JmsSubscription(String name) {
super(name);
@@ -102,6 +108,12 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(jmsTopic);
consumer.setMessageListener(this);
+ checkTermination = true;
+ isSessionActive = true;
+ if (getTerminationTime() != null) {
+ terminationThread = new TerminationThread();
+ terminationThread.start();
+ }
} catch (JMSException e) {
SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
throw new SubscribeCreationFailedFault("Error starting subscription", fault,
e);
@@ -134,6 +146,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
} else {
try {
session.close();
+ isSessionActive = false;
} catch (JMSException e) {
PauseFailedFaultType fault = new PauseFailedFaultType();
throw new PauseFailedFault("Error pausing subscription", fault, e);
@@ -153,6 +166,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(jmsTopic);
consumer.setMessageListener(this);
+ isSessionActive = true;
} catch (JMSException e) {
ResumeFailedFaultType fault = new ResumeFailedFaultType();
throw new ResumeFailedFault("Error resuming subscription", fault, e);
@@ -162,8 +176,15 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
@Override
protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault
{
- UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
- throw new UnacceptableTerminationTimeFault("TerminationTime is not supported", fault);
+ try {
+ this.resume();
+ if (this.terminationThread == null) {
+ terminationThread = new TerminationThread();
+ terminationThread.start();
+ }
+ } catch (ResumeFailedFault e) {
+ LOGGER.log(Level.WARNING, "renew failed", e);
+ }
}
@Override
@@ -172,6 +193,7 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
if (session != null) {
try {
session.close();
+ checkTermination = false;
} catch (JMSException e) {
UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault,
e);
@@ -245,5 +267,30 @@ public abstract class JmsSubscription extends AbstractSubscription implements
Me
}
protected abstract void doNotify(Notify notify);
+
+ class TerminationThread extends Thread {
+ public void run() {
+ while (checkTermination) {
+ XMLGregorianCalendar tt = getTerminationTime();
+ if (tt != null && isSessionActive) {
+ XMLGregorianCalendar ct = getCurrentTime();
+ int c = tt.compare(ct);
+ if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+ LOGGER.log(Level.INFO, "Need Pause this subscribe");
+ try {
+ pause();
+ } catch (PauseFailedFault e) {
+ LOGGER.log(Level.WARNING, "Pause failed", e);
+ }
+ }
+ }
+ try {
+ Thread.sleep(10000); // check if should terminate every 10 sec
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "TerminationThread sleep interrupted", e);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/15e4db96/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
----------------------------------------------------------------------
diff --git a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
index e8f388d..b81e70c 100644
--- a/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
+++ b/services/wsn/wsn-core/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
@@ -130,6 +130,7 @@ public abstract class WsnBrokerTest extends Assert {
Consumer consumer = new Consumer(callback, "http://localhost:" + port2 + "/test/consumer");
Subscription subscription = notificationBroker.subscribe(consumer, "myTopic");
+
synchronized (callback.notifications) {
notificationBroker.notify("myTopic",
@@ -145,6 +146,38 @@ public abstract class WsnBrokerTest extends Assert {
subscription.unsubscribe();
consumer.stop();
}
+
+ @Test
+ public void testRenew() throws Exception {
+ TestConsumer callback = new TestConsumer();
+ Consumer consumer = new Consumer(callback, "http://localhost:" + port2 + "/test/consumer");
+
+ //create subscription with InitialTerminationTime 20 sec, so that the
+ //subscription would be expired after 20 sec
+ Subscription subscription = notificationBroker.subscribe(consumer, "myTopic", null,
false, "PT20S");
+ Thread.sleep(30000);
+ synchronized (callback.notifications) {
+ notificationBroker.notify("myTopic",
+ new JAXBElement<String>(new QName("urn:test:org",
"foo"),
+ String.class, "bar"));
+ callback.notifications.wait(10000);
+ }
+ assertEquals(0, callback.notifications.size()); //the subscription is expired so
can't get the notification
+ subscription.renew("PT60S"); //renew another 60 sec to resend the notification
+ synchronized (callback.notifications) {
+ notificationBroker.notify("myTopic",
+ new JAXBElement<String>(new QName("urn:test:org",
"foo"),
+ String.class, "bar"));
+ callback.notifications.wait(10000);
+ }
+ assertEquals(1, callback.notifications.size()); //the subscription is expired so
can't get the notification
+ NotificationMessageHolderType message = callback.notifications.get(0);
+ assertEquals(WSNHelper.getInstance().getWSAAddress(subscription.getEpr()),
+ WSNHelper.getInstance().getWSAAddress(message.getSubscriptionReference()));
+
+ subscription.unsubscribe();
+ consumer.stop();
+ }
@Test
public void testPullPoint() throws Exception {
|