cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r1179981 [2/7] - in /cxf/trunk/services: ./ wsn/ wsn/src/ wsn/src/main/ wsn/src/main/java/ wsn/src/main/java/org/ wsn/src/main/java/org/apache/ wsn/src/main/java/org/apache/cxf/ wsn/src/main/java/org/apache/cxf/wsn/ wsn/src/main/java/org/ap...
Date Fri, 07 Oct 2011 09:41:14 GMT
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Publisher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Publisher.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Publisher.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Publisher.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.jws.WebParam;
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.AbstractSubscription;
+import org.apache.cxf.wsn.util.IdGenerator;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.InvalidFilterFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
+import org.oasis_open.docs.wsn.b_2.Renew;
+import org.oasis_open.docs.wsn.b_2.RenewResponse;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.MultipleTopicsSpecifiedFault;
+import org.oasis_open.docs.wsn.bw_2.NoCurrentMessageOnTopicFault;
+import org.oasis_open.docs.wsn.bw_2.NotificationProducer;
+import org.oasis_open.docs.wsn.bw_2.NotifyMessageNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscriptionManager;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+/**
+ * Demand-based publisher.
+ *
+ */
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.NotificationProducer")
+public class Publisher implements NotificationProducer, Referencable {
+
+    public interface Callback {
+        void subscribe(TopicExpressionType topic);
+        void unsubscribe(TopicExpressionType topic);
+    }
+
+    private final Callback callback;
+    private final String address;
+    private final Endpoint endpoint;
+    private final IdGenerator idGenerator = new IdGenerator();
+    private final Map<String, PublisherSubscription> subscriptions = new HashMap<String, PublisherSubscription>();
+
+    public Publisher(Callback callback, String address) {
+        this.callback = callback;
+        this.address = address;
+        this.endpoint = Endpoint.create(this);
+        this.endpoint.publish(address);
+    }
+
+    public void stop() {
+        this.endpoint.stop();
+    }
+
+    public W3CEndpointReference getEpr() {
+        return this.endpoint.getEndpointReference(W3CEndpointReference.class);
+    }
+
+    public SubscribeResponse subscribe(@WebParam(partName = "SubscribeRequest", name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2") Subscribe subscribeRequest) throws InvalidTopicExpressionFault, ResourceUnknownFault, InvalidProducerPropertiesExpressionFault, UnrecognizedPolicyRequestFault, TopicExpressionDialectUnknownFault, NotifyMessageNotSupportedFault, InvalidFilterFault, UnsupportedPolicyRequestFault, InvalidMessageContentExpressionFault, SubscribeCreationFailedFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+        TopicExpressionType topic = null;
+        if (subscribeRequest.getFilter() != null) {
+            for (Object f : subscribeRequest.getFilter().getAny()) {
+                JAXBElement e = null;
+                if (f instanceof JAXBElement) {
+                    e = (JAXBElement) f;
+                    f = e.getValue();
+                }
+                if (f instanceof TopicExpressionType) {
+                    if (!e.getName().equals(AbstractSubscription.QNAME_TOPIC_EXPRESSION)) {
+                        InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+                        throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
+                    }
+                    topic = (TopicExpressionType) f;
+                }
+            }
+        }
+        if (topic == null) {
+            InvalidFilterFaultType fault = new InvalidFilterFaultType();
+            throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
+        }
+        PublisherSubscription pub = new PublisherSubscription(topic);
+        SubscribeResponse response = new SubscribeResponse();
+        response.setSubscriptionReference(pub.getEpr());
+        callback.subscribe(topic);
+        return response;
+    }
+
+    protected void unsubscribe(TopicExpressionType topic) {
+        callback.unsubscribe(topic);
+    }
+
+    public GetCurrentMessageResponse getCurrentMessage(@WebParam(partName = "GetCurrentMessageRequest", name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-2") GetCurrentMessage getCurrentMessageRequest) throws InvalidTopicExpressionFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, TopicNotSupportedFault {
+        NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
+        throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
+    }
+
+    @WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.SubscriptionManager")
+    protected class PublisherSubscription implements SubscriptionManager {
+
+        private final String id;
+        private final TopicExpressionType topic;
+        private final Endpoint endpoint;
+
+        public PublisherSubscription(TopicExpressionType topic) {
+            this.topic = topic;
+            this.id = idGenerator.generateSanitizedId();
+            this.endpoint = Endpoint.create(this);
+            this.endpoint.publish(address + "/subscriptions/" + this.id);
+        }
+
+        public W3CEndpointReference getEpr() {
+            return endpoint.getEndpointReference(W3CEndpointReference.class);
+        }
+
+        public UnsubscribeResponse unsubscribe(@WebParam(partName = "UnsubscribeRequest", name = "Unsubscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2") Unsubscribe unsubscribeRequest) throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
+            Publisher.this.unsubscribe(topic);
+            return new UnsubscribeResponse();
+        }
+
+        public RenewResponse renew(@WebParam(partName = "RenewRequest", name = "Renew", targetNamespace = "http://docs.oasis-open.org/wsn/b-2") Renew renewRequest) throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/PullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/PullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/PullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/PullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import java.math.BigInteger;
+import java.util.List;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
+import org.oasis_open.docs.wsn.b_2.GetMessages;
+import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroyPullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToGetMessagesFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+public class PullPoint implements Referencable {
+
+    private final org.oasis_open.docs.wsn.bw_2.PullPoint pullPoint;
+    private final W3CEndpointReference epr;
+
+    public PullPoint(String address) {
+        this(WSNHelper.createWSA(address));
+    }
+
+    public PullPoint(W3CEndpointReference epr) {
+        this.pullPoint = WSNHelper.getPort(epr, org.oasis_open.docs.wsn.bw_2.PullPoint.class);
+        this.epr = epr;
+    }
+
+    public org.oasis_open.docs.wsn.bw_2.PullPoint getPullPoint() {
+        return pullPoint;
+    }
+
+    public W3CEndpointReference getEpr() {
+        return this.epr;
+    }
+
+    public List<NotificationMessageHolderType> getMessages(long max) throws UnableToGetMessagesFault, ResourceUnknownFault {
+        GetMessages getMessages = new GetMessages();
+        getMessages.setMaximumNumber(BigInteger.valueOf(max));
+        GetMessagesResponse response = pullPoint.getMessages(getMessages);
+        return response.getNotificationMessage();
+    }
+
+    public void destroy() throws UnableToDestroyPullPointFault, ResourceUnknownFault {
+        this.pullPoint.destroyPullPoint(new DestroyPullPoint());
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Referencable.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Referencable.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Referencable.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Referencable.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+public interface Referencable {
+
+    W3CEndpointReference getEpr();
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Registration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Registration.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Registration.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Registration.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationManager;
+import org.oasis_open.docs.wsn.brw_2.ResourceNotDestroyedFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+public class Registration implements Referencable {
+
+    private final PublisherRegistrationManager registration;
+    private final W3CEndpointReference epr;
+
+    public Registration(String address) {
+        this(WSNHelper.createWSA(address));
+    }
+
+    public Registration(W3CEndpointReference epr) {
+        this.registration = WSNHelper.getPort(epr, PublisherRegistrationManager.class);
+        this.epr = epr;
+    }
+
+    public PublisherRegistrationManager getRegistration() {
+        return registration;
+    }
+
+    public W3CEndpointReference getEpr() {
+        return epr;
+    }
+
+    public void destroy() throws ResourceUnknownFault, ResourceNotDestroyedFault {
+        registration.destroyRegistration(new DestroyRegistration());
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Subscription.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Subscription.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Subscription.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Subscription.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.PauseSubscription;
+import org.oasis_open.docs.wsn.b_2.Renew;
+import org.oasis_open.docs.wsn.b_2.ResumeSubscription;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.bw_2.PausableSubscriptionManager;
+import org.oasis_open.docs.wsn.bw_2.PauseFailedFault;
+import org.oasis_open.docs.wsn.bw_2.ResumeFailedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+public class Subscription implements Referencable {
+
+    private final PausableSubscriptionManager subscription;
+    private final W3CEndpointReference epr;
+
+    public Subscription(String address) {
+        this(WSNHelper.createWSA(address));
+    }
+
+    public Subscription(W3CEndpointReference epr) {
+        this.subscription = WSNHelper.getPort(epr, PausableSubscriptionManager.class);
+        this.epr = epr;
+    }
+
+    public PausableSubscriptionManager getSubscription() {
+        return subscription;
+    }
+
+    public W3CEndpointReference getEpr() {
+        return epr;
+    }
+
+    public void pause() throws PauseFailedFault, ResourceUnknownFault {
+        subscription.pauseSubscription(new PauseSubscription());
+    }
+
+    public void resume() throws ResourceUnknownFault, ResumeFailedFault {
+        subscription.resumeSubscription(new ResumeSubscription());
+    }
+
+    public void renew(String terminationTime) throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
+        Renew renew = new Renew();
+        renew.setTerminationTime(terminationTime);
+        subscription.renew(renew);
+    }
+
+    public void unsubscribe() throws UnableToDestroySubscriptionFault, ResourceUnknownFault {
+        subscription.unsubscribe(new Unsubscribe());
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsCreatePullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsCreatePullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsCreatePullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsCreatePullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jaxws;
+
+import java.net.URI;
+import javax.jms.ConnectionFactory;
+import javax.jws.WebService;
+
+import org.apache.cxf.wsn.AbstractPullPoint;
+import org.apache.cxf.wsn.jms.JmsCreatePullPoint;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.CreatePullPoint")
+public class JaxwsCreatePullPoint extends JmsCreatePullPoint {
+
+    public JaxwsCreatePullPoint(String name) {
+        super(name);
+        manager = new JaxwsEndpointManager();
+    }
+
+    public JaxwsCreatePullPoint(String name, ConnectionFactory connectionFactory) {
+        super(name, connectionFactory);
+        manager = new JaxwsEndpointManager();
+    }
+
+    @Override
+    protected AbstractPullPoint createPullPoint(String name) {
+        JaxwsPullPoint pullPoint = new JaxwsPullPoint(name);
+        pullPoint.setManager(getManager());
+        pullPoint.setConnection(connection);
+        pullPoint.setAddress(URI.create(getAddress()).resolve("pullpoints/" + name).toString());
+        return pullPoint;
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsEndpointManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsEndpointManager.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsEndpointManager.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsEndpointManager.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jaxws;
+
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.EndpointManager;
+import org.apache.cxf.wsn.EndpointRegistrationException;
+
+public class JaxwsEndpointManager implements EndpointManager {
+
+    public Object register(String address, Object service) throws EndpointRegistrationException {
+        Endpoint endpoint = Endpoint.create(service);
+        endpoint.publish(address);
+        return endpoint;
+    }
+
+    public void unregister(Object endpoint) throws EndpointRegistrationException {
+        ((Endpoint) endpoint).stop();
+    }
+
+    public W3CEndpointReference getEpr(Object endpoint) {
+        return ((Endpoint) endpoint).getEndpointReference(W3CEndpointReference.class);
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsNotificationBroker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsNotificationBroker.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsNotificationBroker.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsNotificationBroker.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jaxws;
+
+import javax.jms.ConnectionFactory;
+import javax.jws.WebService;
+
+import org.apache.cxf.wsn.jms.JmsNotificationBroker;
+import org.apache.cxf.wsn.jms.JmsPublisher;
+import org.apache.cxf.wsn.jms.JmsSubscription;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.brw_2.NotificationBroker")
+public class JaxwsNotificationBroker extends JmsNotificationBroker {
+
+    public JaxwsNotificationBroker(String name) {
+        super(name);
+        manager = new JaxwsEndpointManager();
+    }
+
+    public JaxwsNotificationBroker(String name, ConnectionFactory connectionFactory) {
+        super(name, connectionFactory);
+        manager = new JaxwsEndpointManager();
+    }
+
+    @Override
+    protected JmsSubscription createJmsSubscription(String name) {
+        return new JaxwsSubscription(name);
+    }
+
+    @Override
+    protected JmsPublisher createJmsPublisher(String name) {
+        return new JaxwsPublisher(name, this);
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPublisher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPublisher.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPublisher.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPublisher.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jaxws;
+
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+
+import org.apache.cxf.wsn.AbstractSubscription;
+import org.apache.cxf.wsn.jms.JmsPublisher;
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.FilterType;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.NotificationProducer;
+import org.oasis_open.docs.wsn.bw_2.SubscriptionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.brw_2.PublisherRegistrationManager")
+public class JaxwsPublisher extends JmsPublisher {
+
+    private final Logger logger = LoggerFactory.getLogger(JaxwsPublisher.class);
+
+    protected JaxwsNotificationBroker notificationBroker;
+    private NotificationProducer notificationProducer;
+
+    public JaxwsPublisher(String name, JaxwsNotificationBroker notificationBroker) {
+        super(name);
+        this.notificationBroker = notificationBroker;
+    }
+
+    @Override
+    protected void start() throws PublisherRegistrationFailedFault {
+        super.start();
+        if (demand) {
+            notificationProducer = WSNHelper.getPort(publisherReference, NotificationProducer.class);
+        }
+    }
+
+    @Override
+    protected Object startSubscription(TopicExpressionType topic) {
+        try {
+            Subscribe subscribeRequest = new Subscribe();
+            subscribeRequest.setConsumerReference(notificationBroker.getEpr());
+            subscribeRequest.setFilter(new FilterType());
+            subscribeRequest.getFilter().getAny().add(
+                    new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
+                            TopicExpressionType.class, topic));
+            SubscribeResponse response = notificationProducer.subscribe(subscribeRequest);
+            return WSNHelper.getPort(response.getSubscriptionReference(), SubscriptionManager.class);
+        } catch (Exception e) {
+            logger.info("Error while subscribing on-demand publisher", e);
+            return null;
+        }
+    }
+
+    @Override
+    protected void stopSubscription(Object sub) {
+        try {
+            ((SubscriptionManager) sub).unsubscribe(new Unsubscribe());
+        } catch (Exception e) {
+            logger.info("Error while unsubscribing on-demand publisher", e);
+        }
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jaxws;
+
+import javax.jws.WebService;
+
+import org.apache.cxf.wsn.jms.JmsPullPoint;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.PullPoint")
+public class JaxwsPullPoint extends JmsPullPoint {
+
+    public JaxwsPullPoint(String name) {
+        super(name);
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsSubscription.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsSubscription.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsSubscription.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsSubscription.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jaxws;
+
+import javax.jws.WebService;
+
+import org.apache.cxf.wsn.jms.JmsSubscription;
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.NotificationConsumer;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.PausableSubscriptionManager")
+public class JaxwsSubscription extends JmsSubscription {
+
+    private NotificationConsumer consumer;
+
+    public JaxwsSubscription(String name) {
+        super(name);
+    }
+
+    @Override
+    protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault,
+            InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault,
+            InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+            TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault,
+            UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault {
+        super.validateSubscription(subscribeRequest);
+        // TODO: implement raw notifications
+        if (useRaw) {
+            SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+            throw new SubscribeCreationFailedFault("Raw notifications are not supported", fault);
+        }
+        try {
+            consumer = WSNHelper.getPort(subscribeRequest.getConsumerReference(), NotificationConsumer.class);
+        } catch (Exception e) {
+            SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+            throw new SubscribeCreationFailedFault("Unable to resolve consumer reference endpoint", fault, e);
+        }
+    }
+
+    @Override
+    protected void doNotify(Notify notify) {
+        // TODO: implement raw notifications
+        consumer.notify(notify);
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/InvalidTopicException.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/InvalidTopicException.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/InvalidTopicException.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/InvalidTopicException.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+public class InvalidTopicException extends Exception {
+
+    private static final long serialVersionUID = -3708397351142080702L;
+
+    public InvalidTopicException() {
+        super();
+    }
+
+    public InvalidTopicException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public InvalidTopicException(String message) {
+        super(message);
+    }
+
+    public InvalidTopicException(Throwable cause) {
+        super(cause);
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.cxf.wsn.AbstractCreatePullPoint;
+import org.apache.cxf.wsn.AbstractPullPoint;
+import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
+
+public class JmsCreatePullPoint extends AbstractCreatePullPoint {
+
+    protected ConnectionFactory connectionFactory;
+
+    protected Connection connection;
+
+    public JmsCreatePullPoint(String name) {
+        super(name);
+    }
+
+    public JmsCreatePullPoint(String name, ConnectionFactory connectionFactory) {
+        super(name);
+        this.connectionFactory = connectionFactory;
+    }
+
+    public void init() throws Exception {
+        if (connection == null) {
+            connection = connectionFactory.createConnection();
+            connection.start();
+        }
+        super.init();
+    }
+
+    public void destroy() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.destroy();
+    }
+
+    @Override
+    protected String createPullPointName(CreatePullPoint createPullPointRequest) {
+        // For JMS, avoid using dashes in the pullpoint name (which is also the queue name,
+        // as it will lead to problems with some JMS providers
+        String name = super.createPullPointName(createPullPointRequest);
+        name = name.replace("-", "");
+        return name;
+    }
+
+    @Override
+    protected AbstractPullPoint createPullPoint(String name) {
+        JmsPullPoint pullPoint = new JmsPullPoint(name);
+        pullPoint.setManager(getManager());
+        pullPoint.setConnection(connection);
+        return pullPoint;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsNotificationBroker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsNotificationBroker.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsNotificationBroker.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsNotificationBroker.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.xml.bind.JAXBElement;
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.wsn.AbstractNotificationBroker;
+import org.apache.cxf.wsn.AbstractPublisher;
+import org.apache.cxf.wsn.AbstractSubscription;
+import org.oasis_open.docs.wsrf.rp_2.GetResourcePropertyResponse;
+import org.oasis_open.docs.wsrf.rpw_2.InvalidResourcePropertyQNameFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnavailableFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+public abstract class JmsNotificationBroker extends AbstractNotificationBroker {
+
+    private ConnectionFactory connectionFactory;
+
+    private Connection connection;
+
+    public JmsNotificationBroker(String name) {
+        super(name);
+    }
+
+    public JmsNotificationBroker(String name, ConnectionFactory connectionFactory) {
+        super(name);
+        this.connectionFactory = connectionFactory;
+    }
+
+    public void init() throws Exception {
+        if (connection == null) {
+            connection = connectionFactory.createConnection();
+            connection.start();
+        }
+        super.init();
+    }
+
+    public void destroy() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.destroy();
+    }
+
+    @Override
+    protected AbstractPublisher createPublisher(String name) {
+        JmsPublisher publisher = createJmsPublisher(name);
+        publisher.setAddress(URI.create(getAddress()).resolve("registrations/" + name).toString());
+        publisher.setManager(getManager());
+        publisher.setConnection(connection);
+        return publisher;
+    }
+
+    @Override
+    protected AbstractSubscription createSubscription(String name) {
+        JmsSubscription subscription = createJmsSubscription(name);
+        subscription.setAddress(URI.create(getAddress()).resolve("subscriptions/" + name).toString());
+        subscription.setManager(getManager());
+        subscription.setConnection(connection);
+        return subscription;
+    }
+
+    protected abstract JmsSubscription createJmsSubscription(String name);
+
+    protected abstract JmsPublisher createJmsPublisher(String name);
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    protected GetResourcePropertyResponse handleGetResourceProperty(QName property)
+            throws ResourceUnavailableFault, ResourceUnknownFault, InvalidResourcePropertyQNameFault {
+        if (TOPIC_EXPRESSION_QNAME.equals(property)) {
+            // TODO
+        } else if (FIXED_TOPIC_SET_QNAME.equals(property)) {
+            // TODO
+        } else if (TOPIC_EXPRESSION_DIALECT_QNAME.equals(property)) {
+            GetResourcePropertyResponse r = new GetResourcePropertyResponse();
+            r.getAny().add(new JAXBElement(TOPIC_EXPRESSION_DIALECT_QNAME, URI.class, JmsTopicExpressionConverter.SIMPLE_DIALECT));
+            return r;
+        } else if (TOPIC_SET_QNAME.equals(property)) {
+            // TODO
+        }
+        return super.handleGetResourceProperty(property);
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPublisher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPublisher.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPublisher.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPublisher.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.cxf.wsn.AbstractPublisher;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.brw_2.ResourceNotDestroyedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class JmsPublisher extends AbstractPublisher implements ConsumerListener {
+
+    private final Logger logger = LoggerFactory.getLogger(JmsPublisher.class);
+
+    private Connection connection;
+
+    private JmsTopicExpressionConverter topicConverter;
+
+    private JAXBContext jaxbContext;
+
+    private Topic jmsTopic;
+
+    private List<ConsumerEventSource> advisories;
+
+    private Map<Destination, Object> producers;
+
+    public JmsPublisher(String name) {
+        super(name);
+        topicConverter = new JmsTopicExpressionConverter();
+        try {
+            jaxbContext = JAXBContext.newInstance(Notify.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXB context", e);
+        }
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public void notify(NotificationMessageHolderType messageHolder) {
+        Session session = null;
+        try {
+            Topic topic = topicConverter.toActiveMQTopic(messageHolder.getTopic());
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(topic);
+            Notify notify = new Notify();
+            notify.getNotificationMessage().add(messageHolder);
+            StringWriter writer = new StringWriter();
+            jaxbContext.createMarshaller().marshal(notify, writer);
+            Message message = session.createTextMessage(writer.toString());
+            producer.send(message);
+        } catch (JMSException e) {
+            logger.warn("Error dispatching message", e);
+        } catch (JAXBException e) {
+            logger.warn("Error dispatching message", e);
+        } catch (InvalidTopicException e) {
+            logger.warn("Error dispatching message", e);
+        } finally {
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    logger.debug("Error closing session", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+            PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+            TopicNotSupportedFault {
+        super.validatePublisher(registerPublisherRequest);
+        try {
+            jmsTopic = topicConverter.toActiveMQTopic(topic);
+        } catch (InvalidTopicException e) {
+            InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+            throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+        }
+    }
+
+    @Override
+    protected void start() throws PublisherRegistrationFailedFault {
+        if (demand) {
+            try {
+                producers = new HashMap<Destination, Object>();
+                advisories = new ArrayList<ConsumerEventSource>();
+                for (TopicExpressionType topic : this.topic) {
+                    ConsumerEventSource advisory = new ConsumerEventSource(connection, topicConverter.toActiveMQTopic(topic));
+                    advisory.setConsumerListener(this);
+                    advisory.start();
+                    advisories.add(advisory);
+                }
+            } catch (Exception e) {
+                PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+                throw new PublisherRegistrationFailedFault("Error starting demand-based publisher", fault, e);
+            }
+        }
+    }
+
+    protected void destroy() throws ResourceNotDestroyedFault {
+        try {
+            if (advisories != null) {
+                for (ConsumerEventSource advisory : advisories) {
+                    advisory.stop();
+                }
+            }
+        } catch (Exception e) {
+            ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
+            throw new ResourceNotDestroyedFault("Error destroying publisher", fault, e);
+        } finally {
+            super.destroy();
+        }
+    }
+
+    public synchronized void onConsumerEvent(ConsumerEvent event) {
+        Object producer = producers.get(event.getDestination());
+        if (event.getConsumerCount() > 0) {
+            if (producer == null) {
+                // start subscription
+                producer = startSubscription(topicConverter.toTopicExpression((Topic) event.getDestination()));
+                producers.put(event.getDestination(), producer);
+            }
+        } else {
+            if (producer != null) {
+                Object sub = producers.remove(event.getDestination());
+                // destroy subscription
+                stopSubscription(sub);
+            }
+        }
+    }
+
+    protected abstract Object startSubscription(TopicExpressionType topic);
+
+    protected abstract void stopSubscription(Object sub);
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+import org.apache.cxf.wsn.AbstractPullPoint;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.UnableToGetMessagesFaultType;
+import org.oasis_open.docs.wsn.bw_2.UnableToGetMessagesFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsPullPoint extends AbstractPullPoint {
+
+    private final Logger logger = LoggerFactory.getLogger(JmsPullPoint.class);
+
+    private JAXBContext jaxbContext;
+
+    private Connection connection;
+
+    private Session session;
+
+    private Queue queue;
+
+    private MessageProducer producer;
+
+    private MessageConsumer consumer;
+
+    public JmsPullPoint(String name) {
+        super(name);
+        try {
+            jaxbContext = JAXBContext.newInstance(Notify.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Could not create PullEndpoint", e);
+        }
+    }
+
+    protected void initSession() throws JMSException {
+        if (session == null) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            queue = session.createQueue(getName());
+            producer = session.createProducer(queue);
+            consumer = session.createConsumer(queue);
+        }
+    }
+
+    @Override
+    protected synchronized void store(NotificationMessageHolderType messageHolder) {
+        try {
+            initSession();
+            Notify notify = new Notify();
+            notify.getNotificationMessage().add(messageHolder);
+            StringWriter writer = new StringWriter();
+            jaxbContext.createMarshaller().marshal(notify, writer);
+            Message message = session.createTextMessage(writer.toString());
+            producer.send(message);
+        } catch (JMSException e) {
+            logger.warn("Error storing message", e);
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException inner) {
+                    logger.debug("Error closing session", inner);
+                } finally {
+                    session = null;
+                }
+            }
+        } catch (JAXBException e) {
+            logger.warn("Error storing message", e);
+        }
+    }
+
+    @Override
+    protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault,
+            UnableToGetMessagesFault {
+        try {
+            if (max == 0) {
+                max = 256;
+            }
+            initSession();
+            List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
+            for (int i = 0; i < max; i++) {
+                Message msg = consumer.receiveNoWait();
+                if (msg == null) {
+                    break;
+                }
+                TextMessage txtMsg = (TextMessage) msg;
+                StringReader reader = new StringReader(txtMsg.getText());
+                Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
+                messages.addAll(notify.getNotificationMessage());
+            }
+            return messages;
+        } catch (JMSException e) {
+            logger.info("Error retrieving messages", e);
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException inner) {
+                    logger.debug("Error closing session", inner);
+                } finally {
+                    session = null;
+                }
+            }
+            UnableToGetMessagesFaultType fault = new UnableToGetMessagesFaultType();
+            throw new UnableToGetMessagesFault("Unable to retrieve messages", fault, e);
+        } catch (JAXBException e) {
+            logger.info("Error retrieving messages", e);
+            UnableToGetMessagesFaultType fault = new UnableToGetMessagesFaultType();
+            throw new UnableToGetMessagesFault("Unable to retrieve messages", fault, e);
+        }
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+import java.io.StringReader;
+import java.util.Iterator;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.apache.cxf.wsn.AbstractSubscription;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.PauseFailedFault;
+import org.oasis_open.docs.wsn.bw_2.ResumeFailedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+public abstract class JmsSubscription extends AbstractSubscription implements MessageListener {
+
+    private final Logger logger = LoggerFactory.getLogger(JmsSubscription.class);
+
+    private Connection connection;
+
+    private Session session;
+
+    private JmsTopicExpressionConverter topicConverter;
+
+    private Topic jmsTopic;
+
+    private JAXBContext jaxbContext;
+
+    public JmsSubscription(String name) {
+        super(name);
+        topicConverter = new JmsTopicExpressionConverter();
+        try {
+            jaxbContext = JAXBContext.newInstance(Notify.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXB context", e);
+        }
+    }
+
+    protected void start() throws SubscribeCreationFailedFault {
+        try {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(jmsTopic);
+            consumer.setMessageListener(this);
+        } catch (JMSException e) {
+            SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+            throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
+        }
+    }
+
+    @Override
+    protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault,
+            InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault,
+            InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+            TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault,
+            UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault {
+        super.validateSubscription(subscribeRequest);
+        try {
+            jmsTopic = topicConverter.toActiveMQTopic(topic);
+        } catch (InvalidTopicException e) {
+            InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+            throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+        }
+    }
+
+    @Override
+    protected void pause() throws PauseFailedFault {
+        if (session == null) {
+            PauseFailedFaultType fault = new PauseFailedFaultType();
+            throw new PauseFailedFault("Subscription is already paused", fault);
+        } else {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                PauseFailedFaultType fault = new PauseFailedFaultType();
+                throw new PauseFailedFault("Error pausing subscription", fault, e);
+            } finally {
+                session = null;
+            }
+        }
+    }
+
+    @Override
+    protected void resume() throws ResumeFailedFault {
+        if (session != null) {
+            ResumeFailedFaultType fault = new ResumeFailedFaultType();
+            throw new ResumeFailedFault("Subscription is already running", fault);
+        } else {
+            try {
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(jmsTopic);
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                ResumeFailedFaultType fault = new ResumeFailedFaultType();
+                throw new ResumeFailedFault("Error resuming subscription", fault, e);
+            }
+        }
+    }
+
+    @Override
+    protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+        UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+        throw new UnacceptableTerminationTimeFault("TerminationTime is not supported", fault);
+    }
+
+    @Override
+    protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+        super.unsubscribe();
+        if (session != null) {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+                throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
+            } finally {
+                session = null;
+            }
+        }
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public void onMessage(Message jmsMessage) {
+        try {
+            TextMessage text = (TextMessage) jmsMessage;
+            Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(new StringReader(text.getText()));
+            for (Iterator<NotificationMessageHolderType> ith = notify.getNotificationMessage().iterator(); ith.hasNext();) {
+                NotificationMessageHolderType h = ith.next();
+                Object content = h.getMessage().getAny();
+                if (!(content instanceof Element)) {
+                    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+                    factory.setNamespaceAware(true);
+                    Document doc = factory.newDocumentBuilder().newDocument();
+                    jaxbContext.createMarshaller().marshal(content, doc);
+                    content = doc.getDocumentElement();
+                }
+                if (!doFilter((Element) content)) {
+                    ith.remove();
+                } else {
+                    h.setTopic(topic);
+                    h.setSubscriptionReference(getEpr());
+                }
+            }
+            if (!notify.getNotificationMessage().isEmpty()) {
+                doNotify(notify);
+            }
+        } catch (Exception e) {
+            logger.warn("Error notifying consumer", e);
+        }
+    }
+
+    protected boolean doFilter(Element content) {
+        if (contentFilter != null) {
+            if (!contentFilter.getDialect().equals(XPATH1_URI)) {
+                throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
+            }
+            try {
+                XPathFactory xpfactory = XPathFactory.newInstance();
+                XPath xpath = xpfactory.newXPath();
+                XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
+                Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
+                return ret.booleanValue();
+            } catch (XPathExpressionException e) {
+                logger.warn("Could not filter notification", e);
+            }
+            return false;
+        }
+        return true;
+    }
+
+    protected abstract void doNotify(Notify notify);
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsTopicExpressionConverter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsTopicExpressionConverter.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsTopicExpressionConverter.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsTopicExpressionConverter.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.jms;
+
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.Topic;
+import javax.xml.namespace.QName;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+
+public class JmsTopicExpressionConverter {
+
+    public static final String SIMPLE_DIALECT = "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple";
+
+    public TopicExpressionType toTopicExpression(Topic topic) {
+        return toTopicExpression(topic.toString());
+    }
+
+    public TopicExpressionType toTopicExpression(ActiveMQTopic topic) {
+        return toTopicExpression(topic.getPhysicalName());
+    }
+
+    public TopicExpressionType toTopicExpression(String name) {
+        TopicExpressionType answer = new TopicExpressionType();
+        answer.getContent().add(name);
+        answer.setDialect(SIMPLE_DIALECT);
+        return answer;
+    }
+
+    public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws InvalidTopicException {
+        if (topics == null || topics.size() == 0) {
+            return null;
+        }
+        int size = topics.size();
+        ActiveMQTopic childrenDestinations[] = new ActiveMQTopic[size];
+        for (int i = 0; i < size; i++) {
+            childrenDestinations[i] = toActiveMQTopic(topics.get(i));
+        }
+
+        ActiveMQTopic topic = new ActiveMQTopic();
+        topic.setCompositeDestinations(childrenDestinations);
+        return topic;
+    }
+
+    public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws InvalidTopicException {
+        String dialect = topic.getDialect();
+        if (dialect == null || SIMPLE_DIALECT.equals(dialect)) {
+            for (Iterator iter = topic.getContent().iterator(); iter.hasNext();) {
+                ActiveMQTopic answer = createActiveMQTopicFromContent(iter.next());
+                if (answer != null) {
+                    return answer;
+                }
+            }
+            throw new InvalidTopicException("No topic name available topic: " + topic);
+        } else {
+            throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
+        }
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected ActiveMQTopic createActiveMQTopicFromContent(Object contentItem) {
+        if (contentItem instanceof String) {
+            return new ActiveMQTopic(((String) contentItem).trim());
+        }
+        if (contentItem instanceof QName) {
+            return createActiveMQTopicFromQName((QName) contentItem);
+        }
+        return null;
+    }
+
+    protected ActiveMQTopic createActiveMQTopicFromQName(QName qName) {
+        return new ActiveMQTopic(qName.toString());
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/DOMUtil.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/DOMUtil.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/DOMUtil.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/DOMUtil.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.util;
+
+import java.io.StringWriter;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.xml.namespace.QName;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Attr;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/**
+ * <p>
+ * A collection of W3C DOM helper methods.
+ * </p>
+ *
+ * @version $Revision: 564607 $
+ */
+public final class DOMUtil {
+
+    private final static Logger logger = LoggerFactory.getLogger(DOMUtil.class);
+
+    private static DocumentBuilderFactory dbf;
+    private static Queue builders = new ConcurrentLinkedQueue();
+
+
+    private DOMUtil() {
+    }
+
+    /**
+     * <p>
+     * Returns the text of the element.
+     * </p>
+     *
+     * @param element the element.
+     * @return the element text value.
+     */
+    public static String getElementText(Element element) {
+        StringBuffer buffer = new StringBuffer();
+        NodeList nodeList = element.getChildNodes();
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            Node node = nodeList.item(i);
+            if (node.getNodeType() == Node.TEXT_NODE || node.getNodeType() == Node.CDATA_SECTION_NODE) {
+                buffer.append(node.getNodeValue());
+            }
+        }
+        return buffer.toString();
+    }
+
+    /**
+     * <p>
+     * Moves the content of the given element to the given element.
+     * </p>
+     *
+     * @param from the source element.
+     * @param to the destination element.
+     */
+    public static void moveContent(Element from, Element to) {
+        // lets move the child nodes across
+        NodeList childNodes = from.getChildNodes();
+        while (childNodes.getLength() > 0) {
+            Node node = childNodes.item(0);
+            from.removeChild(node);
+            to.appendChild(node);
+        }
+    }
+
+    /**
+     * <p>
+     * Copy the attribues on one element to the other.
+     * </p>
+     *
+     * @param from the source element.
+     * @param to the destination element.
+     */
+    public static void copyAttributes(Element from, Element to) {
+        // lets copy across all the remainingattributes
+        NamedNodeMap attributes = from.getAttributes();
+        for (int i = 0; i < attributes.getLength(); i++) {
+            Attr node = (Attr) attributes.item(i);
+            to.setAttributeNS(node.getNamespaceURI(), node.getName(), node.getValue());
+        }
+    }
+
+    /**
+     * <p>
+     * A helper method useful for debugging and logging which will convert the given DOM node into XML text.
+     * </p>
+     *
+     * @param node the node.
+     * @return a raw XML string representing the node.
+     */
+    public static String asXML(Node node) throws TransformerException {
+        Transformer transformer = TransformerFactory.newInstance().newTransformer();
+        StringWriter buffer = new StringWriter();
+        transformer.transform(new DOMSource(node), new StreamResult(buffer));
+        return buffer.toString();
+    }
+
+    /**
+     * <p>
+     * A helper method useful for debugging and logging which will convert the given DOM node into XML text.
+     * </p>
+     *
+     * @param node the node.
+     * @return a indented XML string representing the node.
+     */
+    public static String asIndentedXML(Node node) throws TransformerException {
+        Transformer transformer = TransformerFactory.newInstance().newTransformer();
+        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+        StringWriter buffer = new StringWriter();
+        transformer.transform(new DOMSource(node), new StreamResult(buffer));
+        return buffer.toString();
+    }
+
+    /**
+     * <p>
+     * Adds the child element with the given text.
+     * </p>
+     *
+     * @param element the element where to add child.
+     * @param name the child elenemt name.
+     * @param textValue the child element text value.
+     */
+    public static void addChildElement(Element element, String name, Object textValue) {
+        Document document = element.getOwnerDocument();
+        Element child = document.createElement(name);
+        element.appendChild(child);
+        if (textValue != null) {
+            String text = textValue.toString();
+            child.appendChild(document.createTextNode(text));
+        }
+    }
+
+    /**
+     * <p>
+     * Creates a QName instance from the given namespace context for the given qualifiedName.
+     * </p>
+     *
+     * @param element the element to use as the namespace context.
+     * @param qualifiedName the fully qualified name.
+     * @return the QName which matches the qualifiedName.
+     */
+    public static QName createQName(Element element, String qualifiedName) {
+        int index = qualifiedName.indexOf(':');
+        if (index >= 0) {
+            String prefix = qualifiedName.substring(0, index);
+            String localName = qualifiedName.substring(index + 1);
+            String uri = recursiveGetAttributeValue(element, "xmlns:" + prefix);
+            return new QName(uri, localName, prefix);
+        } else {
+            String uri = recursiveGetAttributeValue(element, "xmlns");
+            if (uri != null) {
+                return new QName(uri, qualifiedName);
+            }
+            return new QName(qualifiedName);
+        }
+    }
+
+    /**
+     * <p>
+     * Recursive method to find a given attribute value.
+     * </p>
+     *
+     * @param element the element where to looking for attribute.
+     * @param attributeName the attribute name to look for.
+     * @return the value of the given attribute.
+     */
+    public static String recursiveGetAttributeValue(Element element, String attributeName) {
+        String answer = null;
+        try {
+            answer = element.getAttribute(attributeName);
+        } catch (Exception e) {
+            if (logger.isTraceEnabled()) {
+                logger.trace("Caught exception looking up attribute: " + attributeName + " on element: " + element + ". Cause: " + e, e);
+            }
+        }
+        if (answer == null || answer.length() == 0) {
+            Node parentNode = element.getParentNode();
+            if (parentNode instanceof Element) {
+                return recursiveGetAttributeValue((Element) parentNode, attributeName);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * <p>
+     * Gets the first child element.
+     * </p>
+     *
+     * @param parent the parent node.
+     * @return the first child element.
+     */
+    public static Element getFirstChildElement(Node parent) {
+        NodeList childs = parent.getChildNodes();
+        for (int i = 0; i < childs.getLength(); i++) {
+            Node child = childs.item(i);
+            if (child instanceof Element) {
+                return (Element) child;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * <p>
+     * Gets the next sibling element.
+     * </p>
+     *
+     * @param el the base element.
+     * @return the next sibling element.
+     */
+    public static Element getNextSiblingElement(Element el) {
+        for (Node n = el.getNextSibling(); n != null; n = n.getNextSibling()) {
+            if (n instanceof Element) {
+                return (Element) n;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * <p>
+     * Builds a QName from the element name.
+     * </p>
+     *
+     * @param el the element.
+     * @return the QName for the given element.
+     */
+    public static QName getQName(Element el) {
+        if (el == null) {
+            return null;
+        } else if (el.getPrefix() != null) {
+            return new QName(el.getNamespaceURI(), el.getLocalName(), el.getPrefix());
+        } else {
+            return new QName(el.getNamespaceURI(), el.getLocalName());
+        }
+    }
+
+    public static DocumentBuilder getBuilder() throws ParserConfigurationException {
+        DocumentBuilder builder = (DocumentBuilder) builders.poll();
+        if (builder == null) {
+            if (dbf == null) {
+                dbf = DocumentBuilderFactory.newInstance();
+                dbf.setNamespaceAware(true);
+            }
+            builder = dbf.newDocumentBuilder();
+        }
+        return builder;
+    }
+
+    public static void releaseBuilder(DocumentBuilder builder) {
+        if (builder != null) {
+            builders.add(builder);
+        }
+    }
+
+    /**
+     * <p>
+     * Returns a new document, ready to populate.
+     * </p>
+     *
+     * @return a ready to use Document.
+     */
+    public static Document newDocument() throws ParserConfigurationException {
+        DocumentBuilder builder = getBuilder();
+        Document doc = builder.newDocument();
+        releaseBuilder(builder);
+        return doc;
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/IdGenerator.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/IdGenerator.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/IdGenerator.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.util;
+
+/*
+ * Copied from
+ * http://svn.apache.org/repos/asf/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java
+ */
+
+import java.net.InetAddress;
+import java.net.ServerSocket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Generator for Globally unique Strings.
+ */
+
+public class IdGenerator {
+
+    private static final Log LOG = LogFactory.getLog(IdGenerator.class);
+
+    private static final String UNIQUE_STUB;
+
+    private static int instanceCount;
+
+    private static String hostName;
+
+    private String seed;
+
+    private long sequence;
+
+    public IdGenerator() {
+        this("ID:");
+    }
+
+    /**
+     * Construct an IdGenerator
+     * 
+     */
+    public IdGenerator(String prefix) {
+        synchronized (UNIQUE_STUB) {
+            this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":";
+        }
+    }
+    
+    static {
+        String stub = "";
+        boolean canAccessSystemProps = true;
+        try {
+            SecurityManager sm = System.getSecurityManager();
+            if (sm != null) {
+                sm.checkPropertiesAccess();
+            }
+        } catch (SecurityException se) {
+            canAccessSystemProps = false;
+        }
+
+        if (canAccessSystemProps) {
+            try {
+                hostName = InetAddress.getLocalHost().getHostAddress();
+                ServerSocket ss = new ServerSocket(0);
+                stub = hostName + "-" + Long.toHexString(ss.getLocalPort() ^ System.currentTimeMillis()) + "-";
+                Thread.sleep(100);
+                ss.close();
+            } catch (Exception ioe) {
+                LOG.warn("Could not generate unique stub", ioe);
+            }
+        } else {
+            hostName = "localhost";
+            stub = hostName + Long.toHexString(System.currentTimeMillis()) + "-";
+        }
+        UNIQUE_STUB = stub;
+    }
+
+    /**
+     * As we have to find the hostname as a side-affect of generating a unique
+     * stub, we allow it's easy retrevial here
+     * 
+     * @return the local host name
+     */
+
+    public static String getHostName() {
+        return hostName;
+    }
+
+    /**
+     * Generate a unqiue id
+     * 
+     * @return a unique id
+     */
+
+    public synchronized String generateId() {
+        return this.seed + (this.sequence++);
+    }
+
+    /**
+     * Generate a unique ID - that is friendly for a URL or file system
+     * 
+     * @return a unique id
+     */
+    public String generateSanitizedId() {
+        String result = generateId();
+        result = result.replace(':', '-');
+        result = result.replace('_', '-');
+        result = result.replace('.', '-');
+        return result;
+    }
+
+}



Mime
View raw message