activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lovegiver <frederic.courc...@gmail.com>
Subject Re: Unsubscribing DurableSubscribers
Date Fri, 16 Sep 2016 19:02:59 GMT
​​
Good evening Tim,

thanx a lot for responding me.

The response is YES.

I've got 3 topics living and each has got 2 subscribers (it's my test case).

Topics have been created.
DurableSubscribers have registered.
In case they disconnect, a listener is ON for each topic.

Each subscriber has got a ClientID -> his name
Each subscription has got a unique DurableID generated like this (as
written in code sample) : "topicName+ClientID"

In the code I shared, I'm using only the "session.unsuscribe(durableID)"
because when I prior use the "consumer.close()" statement, I got the same
error message.

I think you're telling me to do so :
consumer.close();
session.unsubscribe(durableID);

Am I right ?

It doesn't work, I don't know why...



Frédéric COURCIER
citizen-web <http://blog.citizen-web.com/>

<http://goog_1602287852>




2016-09-16 20:47 GMT+02:00 Tim Bain [via ActiveMQ] <
ml-node+s2283324n4716607h5@n4.nabble.com>:

> At the time you try to unsubscribe, is a client with that ID connected?
> If
> so, you need to close the existing consumer before unsubscribing.
>
> Tim
>
> On Sep 16, 2016 8:01 AM, "Lovegiver" <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=4716607&i=0>> wrote:
>
> > Hello there,I'm trying to *UNSUBSCRIBE durable subscribers from
> TOPICS*.My
> > app is a kind of social network : each user is a topic for other users.
> So,
> > each time a user is doing something, his friends are notified.Of course,
> a
> > subscriber may unsubscribe from a topic, wanting to receive
> notifications
> > about a user no more.Each time I'm trying to unsubscribe, I've got an
> error
> > telling me that : "javax.jms.JMSException: Durable consumer is in
> use"Here
> > are my 2 classes, the SENDER one and the RECEIVER one.Can someone tell
> me
> > what I'm doing wrong ??I'm just using basic features of
> > ActiveMQ...Thanx.*SENDER :*package com.citizenweb.classes;import
> > java.util.Date;import javax.jms.Connection;import
> > javax.jms.ConnectionFactory;import javax.jms.Destination;import
> > javax.jms.JMSException;import javax.jms.MessageFormatException;import
> > javax.jms.MessageProducer;import javax.jms.Session;import
> > javax.jms.TextMessage;import javax.jms.Topic;import
> > javax.jms.ObjectMessage;import org.apache.activemq.
> > ActiveMQConnection;import
> > org.apache.activemq.ActiveMQConnectionFactory;import
> > org.apache.activemq.ActiveMQSession;import
> > com.citizenweb.interfaces.PostIF;import
> > com.citizenweb.interfaces.UserIF;public class Sender {  private
> > ActiveMQConnectionFactory factory = null;       private
> ActiveMQConnection
> > connection = null;      private ActiveMQSession session = null; private
> > Destination destination = null; private MessageProducer producer = null;
> > public Sender() {       }       public void connect(){          try{
> >               factory = new
> > ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
> >                // TODO
> > Mécanisme de sécurité d'ActiveMQ à rétablir en production
> > factory.setTrustAllPackages(true);                      connection =
> > (ActiveMQConnection)
> > factory.createConnection();                     connection.start();
> >              session =
> > (ActiveMQSession) connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> > } catch (JMSException e){                       e.printStackTrace();
> >       }       }               public void
> > sendPost(UserIF user,PostIF post) {
> >  if(session==null){connect();}           try {
> > destination = session.createTopic(user.toString());
> >  producer =
> > session.createProducer(destination);                    ObjectMessage
> > postMessage =
> > session.createObjectMessage();
> > postMessage.setObject(post);
> > producer.send(postMessage);                     System.out.println("\n
> > SENDER Object message
> > sent");                                         /*QueueBrowser qb =
> > session.createBrowser((Queue) destination);
> > Enumeration<?> msgs = qb.getEnumeration();                      if (
> > !msgs.hasMoreElements() )
> > {                           System.out.println("No messages in queue");
> >              } else {                            while
> > (msgs.hasMoreElements()) {                              Message tempMsg
> =
> > (Message)msgs.nextElement();
> > System.out.println("Message: " +
> > tempMsg);                           }                   }*/
> >                      } catch (MessageFormatException e) {
> > e.printStackTrace();            } catch (JMSException e) {
> >       e.printStackTrace();            }       }
> > public void sendInformation(UserIF user,String info){
> > if(session==null){connect();}           try {
> >  destination =
> > session.createTopic(user.toString());                   producer =
> > session.createProducer(destination);                    TextMessage
> > infoMessage =
> > session.createTextMessage();
>  infoMessage.setText(info);
> > producer.send(infoMessage);                     System.out.println("\n
> > SENDER Information
> > message sent");         } catch (JMSException e) {
> > e.printStackTrace();            }       }       /**
> > * @param args    * @throws Exception     */     public static void
> > main(String[]
> > args) throws Exception {                                UserIF u1, u2,
> > u3;              String[] nom = new
> > String[5];              String[] prenom = new String[5];
> > String[] login = new
> > String[5];              String[] password = new String[5];
> > Date[] naiss = new Date[5];
> > String[] mail = new String[5];          for (int i = 0; i < 5; i++) {
> >              nom[i] =
> > "nom_" + i;                     prenom[i] = "prenom_" + i;
> >       login[i] = "login_" + i;
> > password[i] = "password_" + i;                  naiss[i] = new Date();
> >               mail[i] = "mail_"
> > + i;            }               System.out.println("\n SENDER
> AFFECTATION
> > DES NOMS");           u1 = new
> > User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
> >       u2 = new
> > User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
> >       u3 = new
> > User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
> > Sender sender = new Sender();
> >  sender.sendInformation(u1, "U1
> > notification");         sender.sendInformation(u2, "U2 notification");
> > sender.sendInformation(u3, "U3 notification");          //PostIF post =
> new
> > Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User)
> > u1).getIdUser(),0);             //sender.sendPost(user, post);
> > sender.session.close();
> > sender.connection.close();      }}*RECEIVER :*package
> > com.citizenweb.classes;import java.io.Serializable;import
> > java.util.ArrayList;import java.util.Date;import java.util.List;import
> > javax.jms.JMSException;import javax.jms.Message;import
> > javax.jms.MessageConsumer;import javax.jms.MessageListener;import
> > javax.jms.ObjectMessage;import javax.jms.Session;import
> > javax.jms.TextMessage;import javax.jms.Topic;import
> > org.apache.activemq.ActiveMQConnection;import
> > org.apache.activemq.ActiveMQConnectionFactory;import
> > org.apache.activemq.ActiveMQSession;import
> > org.apache.activemq.broker.region.Destination;import
> > com.citizenweb.interfaces.PostIF;import
> > com.citizenweb.interfaces.UserIF;import com.citizenweb.classes.Post;public
>
> > class Receiver implements MessageListener, Serializable {       private
> > static
> > final long serialVersionUID = 1L;       private
> ActiveMQConnectionFactory
> > factory
> > = null; private ActiveMQConnection connection = null;   private
> > ActiveMQSession session = null; private Topic destination = null;
> >  private
> > MessageConsumer consumer = null;        UserIF userTopic = new User();
> > UserIF
> > userSubscriber = new User();    List listeMsg = new ArrayList();
> > public
> > Receiver(UserIF subscriber) {           this.userSubscriber =
> subscriber;
> >      }       public
> > void connect() {                try {                   factory = new
> > ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
> >                // TODO
> > Mécanisme de sécurité d'ActiveMQ à rétablir en production
> > factory.setTrustAllPackages(true);                      connection =
> > (ActiveMQConnection)
> > factory.createConnection();                     // ClientID :
> >      //
> > https://qnalist.com/questions/2068823/create-durable-topic-subscriber
> > connection.setClientID(userSubscriber.toString());
> > connection.start();
> > session = (ActiveMQSession) connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);              } catch (JMSException e) {
> > e.printStackTrace();            }       }       public void
> > receiveMessage(UserIF topic) {              try {
> > if (session == null) {                          connect();
> >       }                       destination =
> > session.createTopic(topic.toString());                  String
> > nomAbonnement =
> > topic.toString() + "->" + userSubscriber.toString();
> > //String
> > nomAbonnement = userSubscriber.toString();                      consumer
> =
> > session.createDurableSubscriber(destination, nomAbonnement);
> > consumer.setMessageListener(this);              } catch (JMSException
> e) {
> > e.printStackTrace();            }       }       public void
> > unsubscribe(UserIF topic) {         try {
> > System.out.println("\n RECEIVER Désinscription du topic " +
> > topic.toString());                      //consumer.close();
> >      String nomAbonnement =
> > topic.toString() + "->" + userSubscriber.toString();
> > //String
> > nomAbonnement = userSubscriber.toString();
> > System.out.println("\n RECEIVER
> > Abonnement à clore = " + nomAbonnement);
> > session.unsubscribe(nomAbonnement);
> >  System.out.println("\n RECEIVER " +
> > userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
> >      }
> > catch (JMSException e) {                        e.printStackTrace();
> >       }       }       @Override       public void
> > onMessage(Message message) {            System.out.println("\n RECEIVER
> > OnMessage
> > triggered for " + userSubscriber.toString());
> >  listeMsg.add(message);
> > System.out.println("\n RECEIVER Nombre de messages reçus par " +
> > userSubscriber + " = " + listeMsg.size());              String classe =
> > message.getClass().getSimpleName();             System.out.println("\n
> > RECEIVER Classe
> > de message : " + classe);               try {                   if
> > (message instanceof TextMessage) {
> > TextMessage text = (TextMessage) message;
> >  System.out.println("\n RECEIVER
> > Information : " + text.getText());                      }
> >      if (message instanceof
> > ObjectMessage) {                                System.out.println("\n
> > RECEIVER ObjectMessage");
> > ObjectMessage oMessage = (ObjectMessage) message;
> >      if
> > (oMessage.getObject() instanceof PostIF) {
> >       PostIF post = (PostIF)
> > oMessage.getObject();                                   String s =
> ((Post)
> > post).getCorpsMessage();
> > System.out.println("\n RECEIVER Post : " + s);
>  }
> >                      }               } catch
> > (JMSException e) {                      e.printStackTrace();
>  }
> >      }               public static void
> > main(String[] args) throws JMSException {               /*
> *
> > EACH USER IS A TOPIC FOR
> > OTHER USERS              * WHATEVER A USER DOES RESULTS IN A
> NOTIFICATION
> > TO
> > SUBSCRIBERS             */                              //CREATE USER
> >      UserIF u1, u2, u3;              String[] nom = new
> > String[5];              String[] prenom = new String[5];
> > String[] login = new
> > String[5];              String[] password = new String[5];
> > Date[] naiss = new Date[5];
> > String[] mail = new String[5];          for (int i = 0; i < 5; i++) {
> >              nom[i] =
> > "nom_" + i;                     prenom[i] = "prenom_" + i;
> >       login[i] = "login_" + i;
> > password[i] = "password_" + i;                  naiss[i] = new Date();
> >               mail[i] = "mail_"
> > + i;            }               u1 = new User(nom[0], prenom[0],
> login[0],
> > password[0], naiss[0],
> > mail[0]);               u2 = new User(nom[1], prenom[1], login[1],
> > password[1], naiss[1],
> > mail[1]);               u3 = new User(nom[2], prenom[2], login[2],
> > password[2], naiss[2],
> > mail[2]);                               /*               * MAKE EACH
> USER
> > A SUBSCRIBER           */             Receiver receiver1 =
> > new Receiver(u1);               Receiver receiver2 = new Receiver(u2);
> >       Receiver
> > receiver3 = new Receiver(u3);           /*               * PUT A MESSAGE
> > LISTENER FOR EACH USER
> > */              receiver1.receiveMessage(u2);
> >  receiver1.receiveMessage(u3);
> > receiver2.receiveMessage(u1);           receiver2.receiveMessage(u3);
> > receiver3.receiveMessage(u1);           receiver3.receiveMessage(u2);
> >                      /*               * CALL
> > THE SENDER CLASS TO SEND MESSAGES                */             try {
> >              Sender.main(args);              } catch
> > (Exception e1) {                        e1.printStackTrace();
> }
> >                              /*               * A SLEEP TO HAVE ENOUGH
> > TIME TO LOOK AT THE ACTIVEMQ CONSOLE             * CAN BE REMOVE
> >        */             try {
> > Thread.sleep(10000);            } catch (InterruptedException e) {
> >               // TODO
> > Auto-generated catch block                      e.printStackTrace();
> >               return;         }               /*               *
> > UNSUBSCRIBE SUBSCRIBERS FROM TOPICS              */
> >  receiver1.unsubscribe(u2);
> > receiver1.unsubscribe(u3);              receiver2.unsubscribe(u1);
> > receiver2.unsubscribe(u3);              receiver3.unsubscribe(u1);
> > receiver3.unsubscribe(u2);      }}
> >
> >
> >
> > --
> > View this message in context: http://activemq.2283324.n4.
> > nabble.com/Unsubscribing-DurableSubscribers-tp4716592.html
> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://activemq.2283324.n4.nabble.com/Unsubscribing-DurableSubscribers-
> tp4716592p4716607.html
> To unsubscribe from Unsubscribing DurableSubscribers, click here
> <http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=4716592&code=ZnJlZGVyaWMuY291cmNpZXJAZ21haWwuY29tfDQ3MTY1OTJ8MjAyNTY3MjE0Nw==>
> .
> NAML
> <http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://activemq.2283324.n4.nabble.com/Unsubscribing-DurableSubscribers-tp4716592p4716609.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message