servicemix-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guillaume Nodet" <gno...@gmail.com>
Subject Re: Consumer Can Send
Date Fri, 04 Jan 2008 13:20:17 GMT
I thought I pointed you to the right direction.
Do not use the PublisherComponent that way: either use it as it is intended
(deploy it as a component onto the lightweight container) or use the WSN
client api:
   wsn.notify(topic, msg)

I'm not sure what you are trying to achieve with your TraceComponent.  It
sounds like you want to reimplement the PublisherComponent.  If so, you
should have a look at how it is implemented and patch it.

Also note that both PublisherComponent and the WSN client api are just
sugar, you can send a JBI exchange to the WSN broker directly if you want.

On Jan 4, 2008 2:13 PM, martymusk <martymusk@email.it> wrote:

>
> My problem is always the same!
> In wsn-example, after Consumer received an exchange, I'd like it send copy
> of it. But it doesn't happen. :-(
> I've modified TraceComponent class in the following way, but it doesn't
> work
> fine:
>
>
> package org.apache.servicemix.wsn;
>
> import java.io.StringWriter;
>
> import javax.jbi.JBIException;
> import javax.jbi.messaging.ExchangeStatus;
> import javax.jbi.messaging.MessageExchange;
> import javax.jbi.messaging.MessagingException;
> import javax.jbi.messaging.NormalizedMessage;
> import javax.xml.bind.JAXBContext;
> import javax.xml.transform.Source;
> import javax.xml.transform.TransformerException;
> //import javax.jbi.messaging.InOut;
>
> import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
> import org.apache.servicemix.jbi.messaging.InOutImpl;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.servicemix.MessageExchangeListener;
> import org.apache.servicemix.components.util.ComponentSupport;
> import org.apache.servicemix.wsn.spring.PublisherComponent;
> import org.apache.servicemix.jbi.jaxp.SourceTransformer;
> import org.apache.servicemix.jbi.jaxp.StringSource;
> import org.apache.servicemix.wsn.client.AbstractWSAClient;
> import org.apache.servicemix.wsn.client.NotificationBroker;
> import org.oasis_open.docs.wsn.b_2.Subscribe;
> import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
> import org.oasis_open.docs.wsn.b_2.Unsubscribe;
> import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
> import org.w3c.dom.Element;
>
>
> public class TraceComponent extends ComponentSupport implements
>                MessageExchangeListener {
>
>
>        private Log log = LogFactory.getLog(TraceComponent.class);
>
>        private SourceTransformer sourceTransformer = new
> SourceTransformer();
>
>        public Log getLog() {
>                return log;
>        }
>
>        public void setLog(Log log) {
>                this.log = log;
>        }
>
>        public SourceTransformer getSourceTransformer() {
>                return sourceTransformer;
>        }
>
>        public void setSourceTransformer(SourceTransformer
> sourceTransformer) {
>                this.sourceTransformer = sourceTransformer;
>        }
>
>
>        //AGGIUNGO CODICE
>
>        PublisherComponent publisher = new PublisherComponent();
>          private String subscriptionEndpoint = "subscription";
>          private NotificationBroker wsnBroker;
>          private String topic;
>          private String msg;
>          private boolean demand;
>          private Subscribe subscription;
>
>
>          public String getTopic() {
>                return topic;
>            }
>
>            /**
>             * @param topic The topic to set.
>             */
>            public void setTopic(String topic) {
>               this.topic = topic;
>            }
>
>         public void init() throws JBIException {
>                super.init();
>                getContext().activateEndpoint(getService(),
> subscriptionEndpoint);
>                 log.info("METODO INIT");
>                // publisher.init();
>
>                 publisher.setTopic(topic);
>            }
>
>            /* (non-Javadoc)
>             * @see javax.jbi.management.LifeCycleMBean#start()
>             */
>            public void start() throws JBIException {
>
>                log.info("METODO START");
>                //publisher.start();
>                new Thread() {
>                  public void run() {
>                       try {
>                           wsnBroker = new
> NotificationBroker(getContext());
>
>                           String wsaAddress =
> getService().getNamespaceURI() + "/"
> + getService().getLocalPart() + "/" + subscriptionEndpoint;
>
> wsnBroker.registerPublisher(AbstractWSAClient.createWSA(wsaAddress),
>                                                       topic,
>                                                        demand);
>                        } catch (Exception e) {
>                            log.error("Could not create wsn client", e);
>                        }
>                    }
>                }.start();
>            }
>
>
>            public void shutDown() throws JBIException {
>                log.info("METODO SHUTDOWN");
>            //  publisher.shutDown();
>               super.shutDown();
>            }
>
>            //FINE CODICE AGGIUNTO
>
>        public void onMessageExchange(MessageExchange exchange)
>                        throws MessagingException {
>                // lets dump the incoming message
>                NormalizedMessage message = exchange.getMessage("in");
>                if (message == null) {
>                        log.warn("Received null message from exchange: " +
> exchange);
>                } else {
>                        log.info("Exchange: " + exchange + " received IN
> message: "
>                                        + message);
>                        try {
>                                log.info("Body is: "
>                                                +
> sourceTransformer.toString(message.getContent()));
>                        } catch (TransformerException e) {
>                                log.error("Failed to turn message body into
> text: " + e, e);
>                        }
>                }
>                done(exchange);
>                log.info("MESSAGGIO RICEVUTO");
>
>                log.info("COPIO IL PACCHETTO");
>
>                MessageExchange scambio = (MessageExchange)exchange;
>
>
>
>                log.info("INVIO");
>
>          try{
>
>                log.info("FASE INVIO");
>                //publisher.onMessageExchange(scambio);
>
>                //AGGIUNGO onMessageExchange
>
>                if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
>                    return;
>                }
>                // This is a notification from the WSN broker
>                if
> (exchange.getEndpoint().getEndpointName().equals(subscriptionEndpoint)) {
>                    try {
>                        JAXBContext jaxbContext =
> JAXBContext.newInstance(Subscribe.class);
>                        Source src = exchange.getMessage
> ("in").getContent();
>                        Object input =
> jaxbContext.createUnmarshaller().unmarshal(src);
>                        if (input instanceof Subscribe) {
>                            subscription = (Subscribe) input;
>                            SubscribeResponse response = new
> SubscribeResponse();
>                            String wsaAddress =
> getService().getNamespaceURI() +
> "/" + getService().getLocalPart() + "/" + subscriptionEndpoint;
>
> response.setSubscriptionReference(AbstractWSAClient.createWSA
> (wsaAddress));
>                            StringWriter writer = new StringWriter();
>                            jaxbContext.createMarshaller
> ().marshal(response,
> writer);
>                            NormalizedMessage out = exchange.createMessage
> ();
>                            out.setContent(new StringSource(writer.toString
> ()));
>                            exchange.setMessage(out, "out");
>                            send(exchange);
>
>                        } else if (input instanceof Unsubscribe) {
>                            subscription = null;
>                            UnsubscribeResponse response = new
> UnsubscribeResponse();
>                            StringWriter writer = new StringWriter();
>                            jaxbContext.createMarshaller
> ().marshal(response,
> writer);
>                            NormalizedMessage out = exchange.createMessage
> ();
>                            out.setContent(new StringSource(writer.toString
> ()));
>                            exchange.setMessage(out, "out");
>                            send(exchange);
>
>
>                        } else {
>                            throw new Exception("Unkown request");
>                        }
>                    } catch (Exception e) {
>                        fail(exchange, e);
>                    }
>                // This is a notification to publish
>                }
>
>                  else {
>                    try {
>
>                    log.info("SALTA");
>                        if (!demand || subscription != null) {
>                                log.info("ENTRO NELL'IF");
>                           // Element elem = new
> SourceTransformer().toDOMElement(exchange.getMessage("in"));
>                            log.info("PRIMA DI WSNBROKER");
>                            wsnBroker.notify(topic, msg);
>                            log.info("PRIMA DI DONE");
>                            getDeliveryChannel().send(exchange);
>                            log.info("DOPO DI DONE");
>                        } else {
>                            log.info("Ingore notification as the publisher
> is no
> subscribers");
>                        }
>                    } catch (Exception e) {
>                        fail(exchange, e);
>                        }
>
>                log.debug("IL PACCHETTO INVIATO E': " + scambio);
>
>                  }
>                //FINE onMessageExchange
>
>           } catch(JBIException e){
>                  log.error("ECCEZIONE GENERATA!");
>          }
>
>      log.info("INVIATO");
> }
> }
>
>
> Help me if you can.
> Thank you very much for your helpfulness...
>
> Martina
>
>
>
>
> gnodet wrote:
> >
> > You should not have to deal nor see the internal state of the exchange.
> > If guess you have an exception when trying to send an exchange, but this
> > means that there is a problem in your code.  You should post a snippet
> of
> > your code that is failing so that we can help you.
> >
> > On Jan 4, 2008 12:22 PM, martymusk <martymusk@email.it> wrote:
> >
> >>
> >> Hy,
> >> I've a question...
> >> can I set anywhere Consumer to CAN_SEND?
> >>
> >> I hope you can help me.
> >> Thank you in advance.
> >>
> >> Martina
> >> --
> >> View this message in context:
> >> http://www.nabble.com/Consumer-Can-Send-tp14614543s12049p14614543.html
> >> Sent from the ServiceMix - User mailing list archive at Nabble.com.
> >>
> >>
> >
> >
> > --
> > Cheers,
> > Guillaume Nodet
> > ------------------------
> > Blog: http://gnodet.blogspot.com/
> >
> >
>
> --
> View this message in context:
> http://www.nabble.com/Consumer-Can-Send-tp14614543s12049p14616107.html
> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>
>


-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message