activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "yau" <dwserv...@vip.163.com>
Subject Who to use BytesMessage?
Date Mon, 15 Dec 2008 04:26:06 GMT
Hi, All!

I am using ActiveMQ 5.2.0, and wrote the following code, but it works incorrectly.

The producer thread code:
=======================================================
  Runnable run = new Runnable() {

   @Override
   public void run() {
    System.out.println("Producer started.");
    MsgProducerWrapper prod = new MsgProducerWrapper(initArgs);
    try {
     prod.connect(); //see the following routine
     
     ActiveMQBytesMessage msg = (ActiveMQBytesMessage) prod.getSession().createBytesMessage();
     

     InputStream fIn = new FileInputStream(
       "c:\\debug.txt");
     byte[] buff = new byte[4096];
     int readed;
     
     awaitBarrier(barrier2); //sync with consumer
     sleep(3);
     
     while ((readed = fIn.read(buff)) != -1) {
      msg.writeBytes(buff, 0, readed);
      System.out.println("write: " + readed);
     }
     fIn.close();
     
     prod.sendMessage(msg); 

    } catch (Exception e) {
     e.printStackTrace();
    } finally {
     awaitBarrier(barrier); //sync with main thread
    }
    System.out.println("Producer leave");
   }
  };



 //The producer wrapper's connect method

 public void connect() throws JMSException {
  ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
    initArgs.getUserName(),
    initArgs.getPassword(),
    initArgs.getServerUrl());
  conn = (ActiveMQConnection) factory.createConnection();
  conn.start();
  
  session = (ActiveMQSession) conn.createSession(false, initArgs.getAckMode());

  String subject = initArgs.getSubject();
  if (initArgs.isQueueMode()) {
   if (subject != null && subject.length() > 0)
    queue = session.createQueue(subject);
   else
    queue = session.createTemporaryQueue();
   destination = queue;
  } else {
   if (subject != null && subject.length() > 0) 
    topic = session.createTopic(subject);
   else
    topic = session.createTemporaryTopic();
   destination = topic;
  }
  
  producer = session.createProducer(destination);
  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 }



The consumer code:
=======================================================
  Runnable run = new Runnable() {

   @Override
   public void run() {
    System.out.println("Consumer started.");
    MsgConsumerWrapper consumer = new MsgConsumerWrapper(initArgs);
    try {
     consumer.connect(); //see the following routine

     FileOutputStream out = new FileOutputStream("C:\\debug-received.txt", false);
     byte[] buff = new byte[4096];

     awaitBarrier(barrier2); //sync with producer
     
     BytesMessage msg = (BytesMessage) consumer.recvMessage();
     int readed;
     while ((readed = msg.readBytes(buff, 4096)) != -1) {
      out.write(buff, 0, readed);
      System.out.println("readed: " + readed);      
     }
     out.close();

    } catch (Exception e) {
     e.printStackTrace();
    } finally {
     awaitBarrier(barrier); //sync with main thread
     try {
      consumer.stopConsumer();
      consumer.close();
     } catch (JMSException e) {
      e.printStackTrace();
     }

    }

    System.out.println("Consumer leave");
   }

  };


//The receiver wrapper's connect method

 public void connect() throws JMSException {
  ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
    initArgs.getUserName(), initArgs.getPassword(), initArgs
      .getServerUrl());

  conn = (ActiveMQConnection) factory.createConnection();
  conn.start();

  String subject = initArgs.getSubject();

  session = (ActiveMQSession) conn.createSession(false, initArgs.getAckMode());
  if (initArgs.isQueueMode()) {
   if (subject != null && subject.length() > 0)
    queue = session.createQueue(subject);
   else
    queue = session.createTemporaryQueue();
   destination = queue;

  } else {
   if (subject != null && subject.length() > 0)
    topic = session.createTopic(subject);
   else
    topic = session.createTemporaryTopic();

   destination = topic;
  }
  
  String selector = initArgs.getMessageSelector();
  if (selector != null && selector.length() > 0)
   consumer = session.createConsumer(destination, selector);
  else
   consumer = session.createConsumer(destination);
 }


===========================================================


The problem is, the producer only sent 4.98K, but the consumer receive 64K bytes. What's wrong?
The other problme is when main thread, producer thread and the consumer thread are leave,
but the program was not terminated.


Yau
Mime
View raw message